diff options
Diffstat (limited to 'Lib')
189 files changed, 3582 insertions, 1088 deletions
diff --git a/Lib/_dummy_thread.py b/Lib/_dummy_thread.py index ed50520..13b1f26 100644 --- a/Lib/_dummy_thread.py +++ b/Lib/_dummy_thread.py @@ -24,11 +24,7 @@ TIMEOUT_MAX = 2**31 # imports are done when needed on a function-by-function basis. Since threads # are disabled, the import lock should not be an issue anyway (??). -class error(Exception): - """Dummy implementation of _thread.error.""" - - def __init__(self, *args): - self.args = args +error = RuntimeError def start_new_thread(function, args, kwargs={}): """Dummy implementation of _thread.start_new_thread(). diff --git a/Lib/_pyio.py b/Lib/_pyio.py index 35dea41..a3b89e7 100644 --- a/Lib/_pyio.py +++ b/Lib/_pyio.py @@ -1511,6 +1511,7 @@ class TextIOWrapper(TextIOBase): self._decoded_chars_used = 0 # offset into _decoded_chars for read() self._snapshot = None # info for reconstructing decoder state self._seekable = self._telling = self.buffer.seekable() + self._b2cratio = 0.0 if self._seekable and self.writable(): position = self.buffer.tell() @@ -1678,7 +1679,12 @@ class TextIOWrapper(TextIOBase): # Read a chunk, decode it, and put the result in self._decoded_chars. input_chunk = self.buffer.read1(self._CHUNK_SIZE) eof = not input_chunk - self._set_decoded_chars(self._decoder.decode(input_chunk, eof)) + decoded_chars = self._decoder.decode(input_chunk, eof) + self._set_decoded_chars(decoded_chars) + if decoded_chars: + self._b2cratio = len(input_chunk) / len(self._decoded_chars) + else: + self._b2cratio = 0.0 if self._telling: # At the snapshot point, len(dec_buffer) bytes before the read, @@ -1732,20 +1738,56 @@ class TextIOWrapper(TextIOBase): # forward until it gives us enough decoded characters. saved_state = decoder.getstate() try: + # Fast search for an acceptable start point, close to our + # current pos. + # Rationale: calling decoder.decode() has a large overhead + # regardless of chunk size; we want the number of such calls to + # be O(1) in most situations (common decoders, non-crazy input). + # Actually, it will be exactly 1 for fixed-size codecs (all + # 8-bit codecs, also UTF-16 and UTF-32). + skip_bytes = int(self._b2cratio * chars_to_skip) + skip_back = 1 + assert skip_bytes <= len(next_input) + while skip_bytes > 0: + decoder.setstate((b'', dec_flags)) + # Decode up to temptative start point + n = len(decoder.decode(next_input[:skip_bytes])) + if n <= chars_to_skip: + b, d = decoder.getstate() + if not b: + # Before pos and no bytes buffered in decoder => OK + dec_flags = d + chars_to_skip -= n + break + # Skip back by buffered amount and reset heuristic + skip_bytes -= len(b) + skip_back = 1 + else: + # We're too far ahead, skip back a bit + skip_bytes -= skip_back + skip_back = skip_back * 2 + else: + skip_bytes = 0 + decoder.setstate((b'', dec_flags)) + # Note our initial start point. - decoder.setstate((b'', dec_flags)) - start_pos = position - start_flags, bytes_fed, chars_decoded = dec_flags, 0, 0 - need_eof = 0 + start_pos = position + skip_bytes + start_flags = dec_flags + if chars_to_skip == 0: + # We haven't moved from the start point. + return self._pack_cookie(start_pos, start_flags) # Feed the decoder one byte at a time. As we go, note the # nearest "safe start point" before the current location # (a point where the decoder has nothing buffered, so seek() # can safely start from there and advance to this location). - next_byte = bytearray(1) - for next_byte[0] in next_input: + bytes_fed = 0 + need_eof = 0 + # Chars decoded since `start_pos` + chars_decoded = 0 + for i in range(skip_bytes, len(next_input)): bytes_fed += 1 - chars_decoded += len(decoder.decode(next_byte)) + chars_decoded += len(decoder.decode(next_input[i:i+1])) dec_buffer, dec_flags = decoder.getstate() if not dec_buffer and chars_decoded <= chars_to_skip: # Decoder buffer is empty, so this is a safe start point. @@ -133,11 +133,14 @@ class ABCMeta(type): return cls def register(cls, subclass): - """Register a virtual subclass of an ABC.""" + """Register a virtual subclass of an ABC. + + Returns the subclass, to allow usage as a class decorator. + """ if not isinstance(subclass, type): raise TypeError("Can only register classes") if issubclass(subclass, cls): - return # Already a subclass + return subclass # Already a subclass # Subtle: test for cycles *after* testing for "already a subclass"; # this means we allow X.register(X) and interpret it as a no-op. if issubclass(cls, subclass): @@ -145,6 +148,7 @@ class ABCMeta(type): raise RuntimeError("Refusing to create an inheritance cycle") cls._abc_registry.add(subclass) ABCMeta._abc_invalidation_counter += 1 # Invalidate negative cache + return subclass def _dump_registry(cls, file=None): """Debug helper to print the ABC registry.""" diff --git a/Lib/asynchat.py b/Lib/asynchat.py index 6558512..2199d1b 100644 --- a/Lib/asynchat.py +++ b/Lib/asynchat.py @@ -75,7 +75,7 @@ class async_chat (asyncore.dispatcher): # sign of an application bug that we don't want to pass silently use_encoding = 0 - encoding = 'latin1' + encoding = 'latin-1' def __init__ (self, sock=None, map=None): # for string terminator matching diff --git a/Lib/asyncore.py b/Lib/asyncore.py index 5d7bdda..d647606 100644 --- a/Lib/asyncore.py +++ b/Lib/asyncore.py @@ -289,7 +289,7 @@ class dispatcher: del map[fd] self._fileno = None - def create_socket(self, family, type): + def create_socket(self, family=socket.AF_INET, type=socket.SOCK_STREAM): self.family_and_type = family, type sock = socket.socket(family, type) sock.setblocking(0) diff --git a/Lib/collections.py b/Lib/collections/__init__.py index 98c4325..d4e8ed6 100644 --- a/Lib/collections.py +++ b/Lib/collections/__init__.py @@ -1,10 +1,11 @@ __all__ = ['deque', 'defaultdict', 'namedtuple', 'UserDict', 'UserList', 'UserString', 'Counter', 'OrderedDict'] -# For bootstrapping reasons, the collection ABCs are defined in _abcoll.py. -# They should however be considered an integral part of collections.py. -from _abcoll import * -import _abcoll -__all__ += _abcoll.__all__ + +# For backwards compatibility, continue to make the collections ABCs +# available through the collections module. +from collections.abc import * +import collections.abc +__all__ += collections.abc.__all__ from _collections import deque, defaultdict from operator import itemgetter as _itemgetter @@ -232,10 +233,58 @@ class OrderedDict(dict): ### namedtuple ################################################################################ +_class_template = '''\ +class {typename}(tuple): + '{typename}({arg_list})' + + __slots__ = () + + _fields = {field_names!r} + + def __new__(_cls, {arg_list}): + 'Create new instance of {typename}({arg_list})' + return _tuple.__new__(_cls, ({arg_list})) + + @classmethod + def _make(cls, iterable, new=tuple.__new__, len=len): + 'Make a new {typename} object from a sequence or iterable' + result = new(cls, iterable) + if len(result) != {num_fields:d}: + raise TypeError('Expected {num_fields:d} arguments, got %d' % len(result)) + return result + + def __repr__(self): + 'Return a nicely formatted representation string' + return self.__class__.__name__ + '({repr_fmt})' % self + + def _asdict(self): + 'Return a new OrderedDict which maps field names to their values' + return OrderedDict(zip(self._fields, self)) + + def _replace(_self, **kwds): + 'Return a new {typename} object replacing specified fields with new values' + result = _self._make(map(kwds.pop, {field_names!r}, _self)) + if kwds: + raise ValueError('Got unexpected field names: %r' % list(kwds)) + return result + + def __getnewargs__(self): + 'Return self as a plain tuple. Used by copy and pickle.' + return tuple(self) + +{field_defs} +''' + +_repr_template = '{name}=%r' + +_field_template = '''\ + {name} = _property(_itemgetter({index:d}), doc='Alias for field number {index:d}') +''' + def namedtuple(typename, field_names, verbose=False, rename=False): """Returns a new subclass of tuple with named fields. - >>> Point = namedtuple('Point', 'x y') + >>> Point = namedtuple('Point', ['x', 'y']) >>> Point.__doc__ # docstring for the new class 'Point(x, y)' >>> p = Point(11, y=22) # instantiate with positional args or keywords @@ -260,18 +309,17 @@ def namedtuple(typename, field_names, verbose=False, rename=False): # generating informative error messages and preventing template injection attacks. if isinstance(field_names, str): field_names = field_names.replace(',', ' ').split() # names separated by whitespace and/or commas - field_names = tuple(map(str, field_names)) + field_names = list(map(str, field_names)) if rename: - names = list(field_names) seen = set() - for i, name in enumerate(names): - if (not all(c.isalnum() or c=='_' for c in name) or _iskeyword(name) + for index, name in enumerate(field_names): + if (not all(c.isalnum() or c=='_' for c in name) + or _iskeyword(name) or not name or name[0].isdigit() or name.startswith('_') or name in seen): - names[i] = '_%d' % i + field_names[index] = '_%d' % index seen.add(name) - field_names = tuple(names) - for name in (typename,) + field_names: + for name in [typename] + field_names: if not all(c.isalnum() or c=='_' for c in name): raise ValueError('Type names and field names can only contain alphanumeric characters and underscores: %r' % name) if _iskeyword(name): @@ -286,52 +334,27 @@ def namedtuple(typename, field_names, verbose=False, rename=False): raise ValueError('Encountered duplicate field name: %r' % name) seen_names.add(name) - # Create and fill-in the class template - numfields = len(field_names) - argtxt = repr(field_names).replace("'", "")[1:-1] # tuple repr without parens or quotes - reprtxt = ', '.join('%s=%%r' % name for name in field_names) - template = '''class %(typename)s(tuple): - '%(typename)s(%(argtxt)s)' \n - __slots__ = () \n - _fields = %(field_names)r \n - def __new__(_cls, %(argtxt)s): - 'Create new instance of %(typename)s(%(argtxt)s)' - return _tuple.__new__(_cls, (%(argtxt)s)) \n - @classmethod - def _make(cls, iterable, new=tuple.__new__, len=len): - 'Make a new %(typename)s object from a sequence or iterable' - result = new(cls, iterable) - if len(result) != %(numfields)d: - raise TypeError('Expected %(numfields)d arguments, got %%d' %% len(result)) - return result \n - def __repr__(self): - 'Return a nicely formatted representation string' - return self.__class__.__name__ + '(%(reprtxt)s)' %% self \n - def _asdict(self): - 'Return a new OrderedDict which maps field names to their values' - return OrderedDict(zip(self._fields, self)) \n - def _replace(_self, **kwds): - 'Return a new %(typename)s object replacing specified fields with new values' - result = _self._make(map(kwds.pop, %(field_names)r, _self)) - if kwds: - raise ValueError('Got unexpected field names: %%r' %% kwds.keys()) - return result \n - def __getnewargs__(self): - 'Return self as a plain tuple. Used by copy and pickle.' - return tuple(self) \n\n''' % locals() - for i, name in enumerate(field_names): - template += " %s = _property(_itemgetter(%d), doc='Alias for field number %d')\n" % (name, i, i) + # Fill-in the class template + class_definition = _class_template.format( + typename = typename, + field_names = tuple(field_names), + num_fields = len(field_names), + arg_list = repr(tuple(field_names)).replace("'", "")[1:-1], + repr_fmt = ', '.join(_repr_template.format(name=name) for name in field_names), + field_defs = '\n'.join(_field_template.format(index=index, name=name) + for index, name in enumerate(field_names)) + ) if verbose: - print(template) + print(class_definition) - # Execute the template string in a temporary namespace and + # Execute the class definition string in a temporary namespace and # support tracing utilities by setting a value for frame.f_globals['__name__'] namespace = dict(_itemgetter=_itemgetter, __name__='namedtuple_%s' % typename, OrderedDict=OrderedDict, _property=property, _tuple=tuple) try: - exec(template, namespace) + exec(class_definition, namespace) except SyntaxError as e: - raise SyntaxError(e.msg + ':\n\n' + template) + raise SyntaxError(e.msg + ':\n\n' + class_definition) result = namespace[typename] # For pickling to work, the __module__ variable needs to be set to the frame @@ -632,10 +655,10 @@ class Counter(dict): ######################################################################## -### ChainMap (helper for configparser) +### ChainMap (helper for configparser and string.Template) ######################################################################## -class _ChainMap(MutableMapping): +class ChainMap(MutableMapping): ''' A ChainMap groups multiple dicts (or other mappings) together to create a single, updateable view. @@ -678,6 +701,9 @@ class _ChainMap(MutableMapping): def __contains__(self, key): return any(key in m for m in self.maps) + def __bool__(self): + return any(self.maps) + @_recursive_repr() def __repr__(self): return '{0.__class__.__name__}({1})'.format( @@ -843,6 +869,8 @@ class UserList(MutableSequence): def insert(self, i, item): self.data.insert(i, item) def pop(self, i=-1): return self.data.pop(i) def remove(self, item): self.data.remove(item) + def clear(self): self.data.clear() + def copy(self): return self.data.copy() def count(self, item): return self.data.count(item) def index(self, item, *args): return self.data.index(item, *args) def reverse(self): self.data.reverse() diff --git a/Lib/_abcoll.py b/Lib/collections/abc.py index 2417d18..7fbe84d 100644 --- a/Lib/_abcoll.py +++ b/Lib/collections/abc.py @@ -3,9 +3,7 @@ """Abstract Base Classes (ABCs) for collections, according to PEP 3119. -DON'T USE THIS MODULE DIRECTLY! The classes here should be imported -via collections; they are defined here only to alleviate certain -bootstrapping issues. Unit tests are in test_collections. +Unit tests are in test_collections. """ from abc import ABCMeta, abstractmethod @@ -48,6 +46,8 @@ dict_proxy = type(type.__dict__) class Hashable(metaclass=ABCMeta): + __slots__ = () + @abstractmethod def __hash__(self): return 0 @@ -65,6 +65,8 @@ class Hashable(metaclass=ABCMeta): class Iterable(metaclass=ABCMeta): + __slots__ = () + @abstractmethod def __iter__(self): while False: @@ -80,6 +82,8 @@ class Iterable(metaclass=ABCMeta): class Iterator(Iterable): + __slots__ = () + @abstractmethod def __next__(self): raise StopIteration @@ -111,6 +115,8 @@ Iterator.register(zip_iterator) class Sized(metaclass=ABCMeta): + __slots__ = () + @abstractmethod def __len__(self): return 0 @@ -125,6 +131,8 @@ class Sized(metaclass=ABCMeta): class Container(metaclass=ABCMeta): + __slots__ = () + @abstractmethod def __contains__(self, x): return False @@ -139,6 +147,8 @@ class Container(metaclass=ABCMeta): class Callable(metaclass=ABCMeta): + __slots__ = () + @abstractmethod def __call__(self, *args, **kwds): return False @@ -166,6 +176,8 @@ class Set(Sized, Iterable, Container): then the other operations will automatically follow suit. """ + __slots__ = () + def __le__(self, other): if not isinstance(other, Set): return NotImplemented @@ -277,6 +289,8 @@ Set.register(frozenset) class MutableSet(Set): + __slots__ = () + @abstractmethod def add(self, value): """Add an element.""" @@ -350,6 +364,8 @@ MutableSet.register(set) class Mapping(Sized, Iterable, Container): + __slots__ = () + @abstractmethod def __getitem__(self, key): raise KeyError @@ -453,6 +469,8 @@ ValuesView.register(dict_values) class MutableMapping(Mapping): + __slots__ = () + @abstractmethod def __setitem__(self, key, value): raise KeyError @@ -532,6 +550,8 @@ class Sequence(Sized, Iterable, Container): __getitem__, and __len__. """ + __slots__ = () + @abstractmethod def __getitem__(self, index): raise IndexError @@ -577,12 +597,16 @@ class ByteString(Sequence): XXX Should add all their methods. """ + __slots__ = () + ByteString.register(bytes) ByteString.register(bytearray) class MutableSequence(Sequence): + __slots__ = () + @abstractmethod def __setitem__(self, index, value): raise IndexError @@ -598,6 +622,13 @@ class MutableSequence(Sequence): def append(self, value): self.insert(len(self), value) + def clear(self): + try: + while True: + self.pop() + except IndexError: + pass + def reverse(self): n = len(self) for i in range(n//2): diff --git a/Lib/compileall.py b/Lib/compileall.py index d79a1bb..743be27 100644 --- a/Lib/compileall.py +++ b/Lib/compileall.py @@ -35,11 +35,11 @@ def compile_dir(dir, maxlevels=10, ddir=None, force=False, rx=None, optimize: optimization level or -1 for level of the interpreter """ if not quiet: - print('Listing', dir, '...') + print('Listing {!r}...'.format(dir)) try: names = os.listdir(dir) except os.error: - print("Can't list", dir) + print("Can't list {!r}".format(dir)) names = [] names.sort() success = 1 @@ -109,13 +109,13 @@ def compile_file(fullname, ddir=None, force=False, rx=None, quiet=False, except IOError: pass if not quiet: - print('Compiling', fullname, '...') + print('Compiling {!r}...'.format(fullname)) try: ok = py_compile.compile(fullname, cfile, dfile, True, optimize=optimize) except py_compile.PyCompileError as err: if quiet: - print('*** Error compiling', fullname, '...') + print('*** Error compiling {!r}...'.format(fullname)) else: print('*** ', end='') # escape non-printable characters in msg @@ -126,7 +126,7 @@ def compile_file(fullname, ddir=None, force=False, rx=None, quiet=False, success = 0 except (SyntaxError, UnicodeError, IOError) as e: if quiet: - print('*** Error compiling', fullname, '...') + print('*** Error compiling {!r}...'.format(fullname)) else: print('*** ', end='') print(e.__class__.__name__ + ':', e) diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index 79c60c3..44f8504 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -66,28 +66,14 @@ import weakref # workers to exit when their work queues are empty and then waits until the # threads/processes finish. -_thread_references = set() +_live_threads = weakref.WeakSet() _shutdown = False def _python_exit(): global _shutdown _shutdown = True - for thread_reference in _thread_references: - thread = thread_reference() - if thread is not None: - thread.join() - -def _remove_dead_thread_references(): - """Remove inactive threads from _thread_references. - - Should be called periodically to prevent memory leaks in scenarios such as: - >>> while True: - >>> ... t = ThreadPoolExecutor(max_workers=5) - >>> ... t.map(int, ['1', '2', '3', '4', '5']) - """ - for thread_reference in set(_thread_references): - if thread_reference() is None: - _thread_references.discard(thread_reference) + for thread in _live_threads: + thread.join() # Controls how many more calls than processes will be queued in the call queue. # A smaller number will mean that processes spend more time idle waiting for @@ -279,7 +265,6 @@ class ProcessPoolExecutor(_base.Executor): worker processes will be created as the machine has processors. """ _check_system_limits() - _remove_dead_thread_references() if max_workers is None: self._max_workers = multiprocessing.cpu_count() @@ -316,7 +301,7 @@ class ProcessPoolExecutor(_base.Executor): self._shutdown_process_event)) self._queue_management_thread.daemon = True self._queue_management_thread.start() - _thread_references.add(weakref.ref(self._queue_management_thread)) + _live_threads.add(self._queue_management_thread) def _adjust_process_count(self): for _ in range(len(self._processes), self._max_workers): diff --git a/Lib/concurrent/futures/thread.py b/Lib/concurrent/futures/thread.py index 15736da..299b94a 100644 --- a/Lib/concurrent/futures/thread.py +++ b/Lib/concurrent/futures/thread.py @@ -25,29 +25,14 @@ import weakref # workers to exit when their work queues are empty and then waits until the # threads finish. -_thread_references = set() +_live_threads = weakref.WeakSet() _shutdown = False def _python_exit(): global _shutdown _shutdown = True - for thread_reference in _thread_references: - thread = thread_reference() - if thread is not None: - thread.join() - -def _remove_dead_thread_references(): - """Remove inactive threads from _thread_references. - - Should be called periodically to prevent memory leaks in scenarios such as: - >>> while True: - ... t = ThreadPoolExecutor(max_workers=5) - ... t.map(int, ['1', '2', '3', '4', '5']) - """ - for thread_reference in set(_thread_references): - if thread_reference() is None: - _thread_references.discard(thread_reference) - + for thread in _live_threads: + thread.join() atexit.register(_python_exit) class _WorkItem(object): @@ -95,8 +80,6 @@ class ThreadPoolExecutor(_base.Executor): max_workers: The maximum number of threads that can be used to execute the given calls. """ - _remove_dead_thread_references() - self._max_workers = max_workers self._work_queue = queue.Queue() self._threads = set() @@ -125,7 +108,7 @@ class ThreadPoolExecutor(_base.Executor): t.daemon = True t.start() self._threads.add(t) - _thread_references.add(weakref.ref(t)) + _live_threads.add(t) def shutdown(self, wait=True): with self._shutdown_lock: diff --git a/Lib/configparser.py b/Lib/configparser.py index 1bfdac8..15fc266 100644 --- a/Lib/configparser.py +++ b/Lib/configparser.py @@ -119,7 +119,8 @@ ConfigParser -- responsible for parsing a list of between keys and values are surrounded by spaces. """ -from collections import MutableMapping, OrderedDict as _default_dict, _ChainMap +from collections.abc import MutableMapping +from collections import OrderedDict as _default_dict, ChainMap as _ChainMap import functools import io import itertools diff --git a/Lib/crypt.py b/Lib/crypt.py new file mode 100644 index 0000000..e65b0cb --- /dev/null +++ b/Lib/crypt.py @@ -0,0 +1,62 @@ +"""Wrapper to the POSIX crypt library call and associated functionality.""" + +import _crypt +import string +from random import choice +from collections import namedtuple + + +_saltchars = string.ascii_letters + string.digits + './' + + +class _Method(namedtuple('_Method', 'name ident salt_chars total_size')): + + """Class representing a salt method per the Modular Crypt Format or the + legacy 2-character crypt method.""" + + def __repr__(self): + return '<crypt.METHOD_{}>'.format(self.name) + + + +def mksalt(method=None): + """Generate a salt for the specified method. + + If not specified, the strongest available method will be used. + + """ + if method is None: + method = methods[0] + s = '${}$'.format(method.ident) if method.ident else '' + s += ''.join(choice(_saltchars) for _ in range(method.salt_chars)) + return s + + +def crypt(word, salt=None): + """Return a string representing the one-way hash of a password, with a salt + prepended. + + If ``salt`` is not specified or is ``None``, the strongest + available method will be selected and a salt generated. Otherwise, + ``salt`` may be one of the ``crypt.METHOD_*`` values, or a string as + returned by ``crypt.mksalt()``. + + """ + if salt is None or isinstance(salt, _Method): + salt = mksalt(salt) + return _crypt.crypt(word, salt) + + +# available salting/crypto methods +METHOD_CRYPT = _Method('CRYPT', None, 2, 13) +METHOD_MD5 = _Method('MD5', '1', 8, 34) +METHOD_SHA256 = _Method('SHA256', '5', 16, 63) +METHOD_SHA512 = _Method('SHA512', '6', 16, 106) + +methods = [] +for _method in (METHOD_SHA512, METHOD_SHA256, METHOD_MD5): + _result = crypt('', _method) + if _result and len(_result) == _method.total_size: + methods.append(_method) +methods.append(METHOD_CRYPT) +del _result, _method diff --git a/Lib/ctypes/test/test_memfunctions.py b/Lib/ctypes/test/test_memfunctions.py index aa2113b..aec4aaa 100644 --- a/Lib/ctypes/test/test_memfunctions.py +++ b/Lib/ctypes/test/test_memfunctions.py @@ -1,4 +1,5 @@ import sys +from test import support import unittest from ctypes import * @@ -49,6 +50,7 @@ class MemFunctionsTest(unittest.TestCase): self.assertEqual(cast(a, POINTER(c_byte))[:7:7], [97]) + @support.refcount_test def test_string_at(self): s = string_at(b"foo bar") # XXX The following may be wrong, depending on how Python diff --git a/Lib/ctypes/test/test_python_api.py b/Lib/ctypes/test/test_python_api.py index 1f4c603..9de3980 100644 --- a/Lib/ctypes/test/test_python_api.py +++ b/Lib/ctypes/test/test_python_api.py @@ -1,5 +1,6 @@ from ctypes import * import unittest, sys +from test import support from ctypes.test import is_resource_enabled ################################################################ @@ -25,6 +26,7 @@ class PythonAPITestCase(unittest.TestCase): self.assertEqual(PyBytes_FromStringAndSize(b"abcdefghi", 3), b"abc") + @support.refcount_test def test_PyString_FromString(self): pythonapi.PyBytes_FromString.restype = py_object pythonapi.PyBytes_FromString.argtypes = (c_char_p,) @@ -56,6 +58,7 @@ class PythonAPITestCase(unittest.TestCase): del res self.assertEqual(grc(42), ref42) + @support.refcount_test def test_PyObj_FromPtr(self): s = "abc def ghi jkl" ref = grc(s) diff --git a/Lib/ctypes/test/test_refcounts.py b/Lib/ctypes/test/test_refcounts.py index 35a81aa..5613e7a 100644 --- a/Lib/ctypes/test/test_refcounts.py +++ b/Lib/ctypes/test/test_refcounts.py @@ -1,4 +1,5 @@ import unittest +from test import support import ctypes import gc @@ -10,6 +11,7 @@ dll = ctypes.CDLL(_ctypes_test.__file__) class RefcountTestCase(unittest.TestCase): + @support.refcount_test def test_1(self): from sys import getrefcount as grc @@ -34,6 +36,7 @@ class RefcountTestCase(unittest.TestCase): self.assertEqual(grc(callback), 2) + @support.refcount_test def test_refcount(self): from sys import getrefcount as grc def func(*args): diff --git a/Lib/ctypes/test/test_stringptr.py b/Lib/ctypes/test/test_stringptr.py index 3d25fa5..95cd161 100644 --- a/Lib/ctypes/test/test_stringptr.py +++ b/Lib/ctypes/test/test_stringptr.py @@ -1,4 +1,5 @@ import unittest +from test import support from ctypes import * import _ctypes_test @@ -7,6 +8,7 @@ lib = CDLL(_ctypes_test.__file__) class StringPtrTestCase(unittest.TestCase): + @support.refcount_test def test__POINTER_c_char(self): class X(Structure): _fields_ = [("str", POINTER(c_char))] diff --git a/Lib/distutils/__init__.py b/Lib/distutils/__init__.py index 49b6d51..bd7d9dd 100644 --- a/Lib/distutils/__init__.py +++ b/Lib/distutils/__init__.py @@ -15,5 +15,5 @@ __revision__ = "$Id$" # Updated automatically by the Python release process. # #--start constants-- -__version__ = "3.2" +__version__ = "3.3a0" #--end constants-- diff --git a/Lib/distutils/command/bdist_wininst.py b/Lib/distutils/command/bdist_wininst.py index b2e2fc6..b886055 100644 --- a/Lib/distutils/command/bdist_wininst.py +++ b/Lib/distutils/command/bdist_wininst.py @@ -263,11 +263,11 @@ class bdist_wininst(Command): cfgdata = cfgdata + b"\0" if self.pre_install_script: # We need to normalize newlines, so we open in text mode and - # convert back to bytes. "latin1" simply avoids any possible + # convert back to bytes. "latin-1" simply avoids any possible # failures. with open(self.pre_install_script, "r", - encoding="latin1") as script: - script_data = script.read().encode("latin1") + encoding="latin-1") as script: + script_data = script.read().encode("latin-1") cfgdata = cfgdata + script_data + b"\n\0" else: # empty pre-install script diff --git a/Lib/distutils/tests/test_bdist_rpm.py b/Lib/distutils/tests/test_bdist_rpm.py index 804fb13..030933f 100644 --- a/Lib/distutils/tests/test_bdist_rpm.py +++ b/Lib/distutils/tests/test_bdist_rpm.py @@ -28,6 +28,11 @@ class BuildRpmTestCase(support.TempdirManager, unittest.TestCase): def setUp(self): + try: + sys.executable.encode("UTF-8") + except UnicodeEncodeError: + raise unittest.SkipTest("sys.executable is not encodable to UTF-8") + super(BuildRpmTestCase, self).setUp() self.old_location = os.getcwd() self.old_sys_argv = sys.argv, sys.argv[:] diff --git a/Lib/doctest.py b/Lib/doctest.py index 9eba4e0..c508a7c 100644 --- a/Lib/doctest.py +++ b/Lib/doctest.py @@ -1373,6 +1373,7 @@ class DocTestRunner: # Note that the interactive output will go to *our* # save_stdout, even if that's not the real sys.stdout; this # allows us to write test cases for the set_trace behavior. + save_trace = sys.gettrace() save_set_trace = pdb.set_trace self.debugger = _OutputRedirectingPdb(save_stdout) self.debugger.reset() @@ -1392,6 +1393,7 @@ class DocTestRunner: finally: sys.stdout = save_stdout pdb.set_trace = save_set_trace + sys.settrace(save_trace) linecache.getlines = self.save_linecache_getlines sys.displayhook = save_displayhook if clear_globs: diff --git a/Lib/email/_parseaddr.py b/Lib/email/_parseaddr.py index 41694f9..4b2f5c6 100644 --- a/Lib/email/_parseaddr.py +++ b/Lib/email/_parseaddr.py @@ -99,6 +99,14 @@ def parsedate_tz(data): tss = '0' elif len(tm) == 3: [thh, tmm, tss] = tm + elif len(tm) == 1 and '.' in tm[0]: + # Some non-compliant MUAs use '.' to separate time elements. + tm = tm[0].split('.') + if len(tm) == 2: + [thh, tmm] = tm + tss = 0 + elif len(tm) == 3: + [thh, tmm, tss] = tm else: return None try: diff --git a/Lib/email/test/__init__.py b/Lib/email/test/__init__.py deleted file mode 100644 index e69de29..0000000 --- a/Lib/email/test/__init__.py +++ /dev/null diff --git a/Lib/ftplib.py b/Lib/ftplib.py index 7c39887..e836b72 100644 --- a/Lib/ftplib.py +++ b/Lib/ftplib.py @@ -100,14 +100,15 @@ class FTP: file = None welcome = None passiveserver = 1 - encoding = "latin1" + encoding = "latin-1" # Initialization method (called by class instantiation). # Initialize host to localhost, port to standard ftp port # Optional arguments are host (for connect()), # and user, passwd, acct (for login()) def __init__(self, host='', user='', passwd='', acct='', - timeout=_GLOBAL_DEFAULT_TIMEOUT): + timeout=_GLOBAL_DEFAULT_TIMEOUT, source_address=None): + self.source_address = source_address self.timeout = timeout if host: self.connect(host) @@ -128,10 +129,12 @@ class FTP: if self.sock is not None: self.close() - def connect(self, host='', port=0, timeout=-999): + def connect(self, host='', port=0, timeout=-999, source_address=None): '''Connect to host. Arguments are: - host: hostname to connect to (string, default previous host) - port: port to connect to (integer, default previous port) + - source_address: a 2-tuple (host, port) for the socket to bind + to as its source address before connecting. ''' if host != '': self.host = host @@ -139,7 +142,10 @@ class FTP: self.port = port if timeout != -999: self.timeout = timeout - self.sock = socket.create_connection((self.host, self.port), self.timeout) + if source_address is not None: + self.source_address = source_address + self.sock = socket.create_connection((self.host, self.port), self.timeout, + source_address=self.source_address) self.af = self.sock.family self.file = self.sock.makefile('r', encoding=self.encoding) self.welcome = self.getresp() @@ -334,7 +340,8 @@ class FTP: size = None if self.passiveserver: host, port = self.makepasv() - conn = socket.create_connection((host, port), self.timeout) + conn = socket.create_connection((host, port), self.timeout, + source_address=self.source_address) if rest is not None: self.sendcmd("REST %s" % rest) resp = self.sendcmd(cmd) @@ -589,11 +596,11 @@ class FTP: def close(self): '''Close the connection without assuming anything about it.''' - if self.file: + if self.file is not None: self.file.close() + if self.sock is not None: self.sock.close() - self.file = self.sock = None - + self.file = self.sock = None try: import ssl @@ -637,7 +644,7 @@ else: def __init__(self, host='', user='', passwd='', acct='', keyfile=None, certfile=None, context=None, - timeout=_GLOBAL_DEFAULT_TIMEOUT): + timeout=_GLOBAL_DEFAULT_TIMEOUT, source_address=None): if context is not None and keyfile is not None: raise ValueError("context and keyfile arguments are mutually " "exclusive") @@ -648,7 +655,7 @@ else: self.certfile = certfile self.context = context self._prot_p = False - FTP.__init__(self, host, user, passwd, acct, timeout) + FTP.__init__(self, host, user, passwd, acct, timeout, source_address) def login(self, user='', passwd='', acct='', secure=True): if secure and not isinstance(self.sock, ssl.SSLSocket): diff --git a/Lib/functools.py b/Lib/functools.py index e92a2fc..3bffbac 100644 --- a/Lib/functools.py +++ b/Lib/functools.py @@ -141,7 +141,7 @@ def lru_cache(maxsize=100): tuple=tuple, sorted=sorted, len=len, KeyError=KeyError): hits = misses = 0 - kwd_mark = object() # separates positional and keyword args + kwd_mark = (object(),) # separates positional and keyword args lock = Lock() # needed because ordereddicts aren't threadsafe if maxsize is None: @@ -152,7 +152,7 @@ def lru_cache(maxsize=100): nonlocal hits, misses key = args if kwds: - key += (kwd_mark,) + tuple(sorted(kwds.items())) + key += kwd_mark + tuple(sorted(kwds.items())) try: result = cache[key] hits += 1 @@ -171,7 +171,7 @@ def lru_cache(maxsize=100): nonlocal hits, misses key = args if kwds: - key += (kwd_mark,) + tuple(sorted(kwds.items())) + key += kwd_mark + tuple(sorted(kwds.items())) try: with lock: result = cache[key] diff --git a/Lib/getopt.py b/Lib/getopt.py index 980861d..3d6ecbd 100644 --- a/Lib/getopt.py +++ b/Lib/getopt.py @@ -19,7 +19,7 @@ option involved with the exception. # Gerrit Holl <gerrit@nl.linux.org> moved the string-based exceptions # to class-based exceptions. # -# Peter Åstrand <astrand@lysator.liu.se> added gnu_getopt(). +# Peter Ã…strand <astrand@lysator.liu.se> added gnu_getopt(). # # TODO for gnu_getopt(): # @@ -34,6 +34,11 @@ option involved with the exception. __all__ = ["GetoptError","error","getopt","gnu_getopt"] import os +try: + from gettext import gettext as _ +except ImportError: + # Bootstrapping Python: gettext's dependencies not built yet + def _(s): return s class GetoptError(Exception): opt = '' @@ -153,10 +158,10 @@ def do_longs(opts, opt, longopts, args): if has_arg: if optarg is None: if not args: - raise GetoptError('option --%s requires argument' % opt, opt) + raise GetoptError(_('option --%s requires argument') % opt, opt) optarg, args = args[0], args[1:] elif optarg is not None: - raise GetoptError('option --%s must not have an argument' % opt, opt) + raise GetoptError(_('option --%s must not have an argument') % opt, opt) opts.append(('--' + opt, optarg or '')) return opts, args @@ -166,7 +171,7 @@ def do_longs(opts, opt, longopts, args): def long_has_args(opt, longopts): possibilities = [o for o in longopts if o.startswith(opt)] if not possibilities: - raise GetoptError('option --%s not recognized' % opt, opt) + raise GetoptError(_('option --%s not recognized') % opt, opt) # Is there an exact match? if opt in possibilities: return False, opt @@ -176,7 +181,7 @@ def long_has_args(opt, longopts): if len(possibilities) > 1: # XXX since possibilities contains all valid continuations, might be # nice to work them into the error msg - raise GetoptError('option --%s not a unique prefix' % opt, opt) + raise GetoptError(_('option --%s not a unique prefix') % opt, opt) assert len(possibilities) == 1 unique_match = possibilities[0] has_arg = unique_match.endswith('=') @@ -190,7 +195,7 @@ def do_shorts(opts, optstring, shortopts, args): if short_has_arg(opt, shortopts): if optstring == '': if not args: - raise GetoptError('option -%s requires argument' % opt, + raise GetoptError(_('option -%s requires argument') % opt, opt) optstring, args = args[0], args[1:] optarg, optstring = optstring, '' @@ -203,7 +208,7 @@ def short_has_arg(opt, shortopts): for i in range(len(shortopts)): if opt == shortopts[i] != ':': return shortopts.startswith(':', i+1) - raise GetoptError('option -%s not recognized' % opt, opt) + raise GetoptError(_('option -%s not recognized') % opt, opt) if __name__ == '__main__': import sys diff --git a/Lib/glob.py b/Lib/glob.py index c5f5f69..36d493d 100644 --- a/Lib/glob.py +++ b/Lib/glob.py @@ -1,6 +1,5 @@ """Filename globbing utility.""" -import sys import os import re import fnmatch diff --git a/Lib/http/client.py b/Lib/http/client.py index 604577c..2e68b3c 100644 --- a/Lib/http/client.py +++ b/Lib/http/client.py @@ -697,7 +697,7 @@ class HTTPConnection: self.send(connect_bytes) for header, value in self._tunnel_headers.items(): header_str = "%s: %s\r\n" % (header, value) - header_bytes = header_str.encode("latin1") + header_bytes = header_str.encode("latin-1") self.send(header_bytes) self.send(b'\r\n') @@ -937,7 +937,7 @@ class HTTPConnection: values = list(values) for i, one_value in enumerate(values): if hasattr(one_value, 'encode'): - values[i] = one_value.encode('latin1') + values[i] = one_value.encode('latin-1') elif isinstance(one_value, int): values[i] = str(one_value).encode('ascii') value = b'\r\n\t'.join(values) diff --git a/Lib/http/server.py b/Lib/http/server.py index 86fa37f..6aacbbd 100644 --- a/Lib/http/server.py +++ b/Lib/http/server.py @@ -452,7 +452,7 @@ class BaseHTTPRequestHandler(socketserver.StreamRequestHandler): message = '' if self.request_version != 'HTTP/0.9': self.wfile.write(("%s %d %s\r\n" % - (self.protocol_version, code, message)).encode('latin1', 'strict')) + (self.protocol_version, code, message)).encode('latin-1', 'strict')) def send_header(self, keyword, value): """Send a MIME header.""" @@ -460,7 +460,7 @@ class BaseHTTPRequestHandler(socketserver.StreamRequestHandler): if not hasattr(self, '_headers_buffer'): self._headers_buffer = [] self._headers_buffer.append( - ("%s: %s\r\n" % (keyword, value)).encode('latin1', 'strict')) + ("%s: %s\r\n" % (keyword, value)).encode('latin-1', 'strict')) if keyword.lower() == 'connection': if value.lower() == 'close': diff --git a/Lib/idlelib/idlever.py b/Lib/idlelib/idlever.py index 5b0907e..5e9afb1 100644 --- a/Lib/idlelib/idlever.py +++ b/Lib/idlelib/idlever.py @@ -1 +1 @@ -IDLE_VERSION = "3.2" +IDLE_VERSION = "3.3a0" diff --git a/Lib/importlib/_bootstrap.py b/Lib/importlib/_bootstrap.py index 425b8bf..a944bee 100644 --- a/Lib/importlib/_bootstrap.py +++ b/Lib/importlib/_bootstrap.py @@ -240,7 +240,7 @@ class BuiltinImporter: @classmethod @_requires_builtin def is_package(cls, fullname): - """Return None as built-in module are never packages.""" + """Return None as built-in modules are never packages.""" return False @@ -765,7 +765,7 @@ def _gcd_import(name, package=None, level=0): being made from, and the level adjustment. This function represents the greatest common denominator of functionality - between import_module and __import__. This includes settting __package__ if + between import_module and __import__. This includes setting __package__ if the loader did not. """ @@ -855,7 +855,7 @@ def __import__(name, globals={}, locals={}, fromlist=[], level=0): module = _gcd_import(name) else: # __package__ is not guaranteed to be defined or could be set to None - # to represent that it's proper value is unknown + # to represent that its proper value is unknown package = globals.get('__package__') if package is None: package = globals['__name__'] diff --git a/Lib/lib2to3/__main__.py b/Lib/lib2to3/__main__.py new file mode 100644 index 0000000..80688ba --- /dev/null +++ b/Lib/lib2to3/__main__.py @@ -0,0 +1,4 @@ +import sys +from .main import main + +sys.exit(main("lib2to3.fixes")) diff --git a/Lib/lib2to3/patcomp.py b/Lib/lib2to3/patcomp.py index bb538d5..0a259e9 100644 --- a/Lib/lib2to3/patcomp.py +++ b/Lib/lib2to3/patcomp.py @@ -11,6 +11,7 @@ The compiler compiles a pattern to a pytree.*Pattern instance. __author__ = "Guido van Rossum <guido@python.org>" # Python imports +import io import os # Fairly local imports @@ -32,7 +33,7 @@ class PatternSyntaxError(Exception): def tokenize_wrapper(input): """Tokenizes a string suppressing significant whitespace.""" skip = set((token.NEWLINE, token.INDENT, token.DEDENT)) - tokens = tokenize.generate_tokens(driver.generate_lines(input).__next__) + tokens = tokenize.generate_tokens(io.StringIO(input).readline) for quintuple in tokens: type, value, start, end, line_text = quintuple if type not in skip: diff --git a/Lib/lib2to3/pgen2/driver.py b/Lib/lib2to3/pgen2/driver.py index ee77a13..e7828ff 100644 --- a/Lib/lib2to3/pgen2/driver.py +++ b/Lib/lib2to3/pgen2/driver.py @@ -17,6 +17,7 @@ __all__ = ["Driver", "load_grammar"] # Python imports import codecs +import io import os import logging import sys @@ -101,18 +102,10 @@ class Driver(object): def parse_string(self, text, debug=False): """Parse a string and return the syntax tree.""" - tokens = tokenize.generate_tokens(generate_lines(text).__next__) + tokens = tokenize.generate_tokens(io.StringIO(text).readline) return self.parse_tokens(tokens, debug) -def generate_lines(text): - """Generator that behaves like readline without using StringIO.""" - for line in text.splitlines(True): - yield line - while True: - yield "" - - def load_grammar(gt="Grammar.txt", gp=None, save=True, force=False, logger=None): """Load the grammar (maybe from a pickle).""" diff --git a/Lib/lib2to3/tests/test_parser.py b/Lib/lib2to3/tests/test_parser.py index ce39e41..f389795 100644 --- a/Lib/lib2to3/tests/test_parser.py +++ b/Lib/lib2to3/tests/test_parser.py @@ -18,6 +18,16 @@ import os # Local imports from lib2to3.pgen2 import tokenize from ..pgen2.parse import ParseError +from lib2to3.pygram import python_symbols as syms + + +class TestDriver(support.TestCase): + + def test_formfeed(self): + s = """print 1\n\x0Cprint 2\n""" + t = driver.parse_string(s) + self.assertEqual(t.children[0].children[0].type, syms.print_stmt) + self.assertEqual(t.children[1].children[0].type, syms.print_stmt) class GrammarTest(support.TestCase): diff --git a/Lib/logging/handlers.py b/Lib/logging/handlers.py index 96384bd..57b2057 100644 --- a/Lib/logging/handlers.py +++ b/Lib/logging/handlers.py @@ -1307,6 +1307,16 @@ class QueueListener(object): except queue.Empty: break + def enqueue_sentinel(self): + """ + This is used to enqueue the sentinel record. + + The base implementation uses put_nowait. You may want to override this + method if you want to use timeouts or work with custom queue + implementations. + """ + self.queue.put_nowait(self._sentinel) + def stop(self): """ Stop the listener. @@ -1316,6 +1326,6 @@ class QueueListener(object): may be some records still left on the queue, which won't be processed. """ self._stop.set() - self.queue.put_nowait(self._sentinel) + self.enqueue_sentinel() self._thread.join() self._thread = None diff --git a/Lib/multiprocessing/connection.py b/Lib/multiprocessing/connection.py index d6c23fb..d6627e5 100644 --- a/Lib/multiprocessing/connection.py +++ b/Lib/multiprocessing/connection.py @@ -434,10 +434,10 @@ class ConnectionWrapper(object): return self._loads(s) def _xml_dumps(obj): - return xmlrpclib.dumps((obj,), None, None, None, 1).encode('utf8') + return xmlrpclib.dumps((obj,), None, None, None, 1).encode('utf-8') def _xml_loads(s): - (obj,), method = xmlrpclib.loads(s.decode('utf8')) + (obj,), method = xmlrpclib.loads(s.decode('utf-8')) return obj class XmlListener(Listener): diff --git a/Lib/multiprocessing/process.py b/Lib/multiprocessing/process.py index b56a061..3fb9ff6 100644 --- a/Lib/multiprocessing/process.py +++ b/Lib/multiprocessing/process.py @@ -91,12 +91,16 @@ class Process(object): ''' _Popen = None - def __init__(self, group=None, target=None, name=None, args=(), kwargs={}): + def __init__(self, group=None, target=None, name=None, args=(), kwargs={}, + *, daemon=None): assert group is None, 'group argument must be None for now' count = next(_current_process._counter) self._identity = _current_process._identity + (count,) self._authkey = _current_process._authkey - self._daemonic = _current_process._daemonic + if daemon is not None: + self._daemonic = daemon + else: + self._daemonic = _current_process._daemonic self._tempdir = _current_process._tempdir self._parent_pid = os.getpid() self._popen = None diff --git a/Lib/nntplib.py b/Lib/nntplib.py index bf66734..3e863dc 100644 --- a/Lib/nntplib.py +++ b/Lib/nntplib.py @@ -346,6 +346,20 @@ class _NNTPBase: # Log in and encryption setup order is left to subclasses. self.authenticated = False + def __enter__(self): + return self + + def __exit__(self, *args): + is_connected = lambda: hasattr(self, "file") + if is_connected(): + try: + self.quit() + except (socket.error, EOFError): + pass + finally: + if is_connected(): + self._close() + def getwelcome(self): """Get the welcome message from the server (this is read and squirreled away by __init__()). diff --git a/Lib/optparse.py b/Lib/optparse.py index 3eb652a..c207713 100644 --- a/Lib/optparse.py +++ b/Lib/optparse.py @@ -86,10 +86,16 @@ def _repr(self): # Id: errors.py 509 2006-04-20 00:58:24Z gward try: - from gettext import gettext + from gettext import gettext, ngettext except ImportError: def gettext(message): return message + + def ngettext(singular, plural, n): + if n == 1: + return singular + return plural + _ = gettext @@ -1478,11 +1484,10 @@ class OptionParser (OptionContainer): if option.takes_value(): nargs = option.nargs if len(rargs) < nargs: - if nargs == 1: - self.error(_("%s option requires an argument") % opt) - else: - self.error(_("%s option requires %d arguments") - % (opt, nargs)) + self.error(ngettext( + "%(option)s option requires %(number)d argument", + "%(option)s option requires %(number)d arguments", + nargs) % {"option": opt, "number": nargs}) elif nargs == 1: value = rargs.pop(0) else: @@ -1517,11 +1522,10 @@ class OptionParser (OptionContainer): nargs = option.nargs if len(rargs) < nargs: - if nargs == 1: - self.error(_("%s option requires an argument") % opt) - else: - self.error(_("%s option requires %d arguments") - % (opt, nargs)) + self.error(ngettext( + "%(option)s option requires %(number)d argument", + "%(option)s option requires %(number)d arguments", + nargs) % {"option": opt, "number": nargs}) elif nargs == 1: value = rargs.pop(0) else: @@ -434,7 +434,7 @@ def get_exec_path(env=None): # Change environ to automatically call putenv(), unsetenv if they exist. -from _abcoll import MutableMapping # Can't use collections (bootstrap) +from collections.abc import MutableMapping class _Environ(MutableMapping): def __init__(self, data, encodekey, decodekey, encodevalue, decodevalue, putenv, unsetenv): diff --git a/Lib/platform.py b/Lib/platform.py index abe917a..75bc5f3 100755 --- a/Lib/platform.py +++ b/Lib/platform.py @@ -357,92 +357,11 @@ def dist(distname='',version='',id='', supported_dists=supported_dists, full_distribution_name=0) -class _popen: - - """ Fairly portable (alternative) popen implementation. - - This is mostly needed in case os.popen() is not available, or - doesn't work as advertised, e.g. in Win9X GUI programs like - PythonWin or IDLE. - - Writing to the pipe is currently not supported. - - """ - tmpfile = '' - pipe = None - bufsize = None - mode = 'r' - - def __init__(self,cmd,mode='r',bufsize=None): - - if mode != 'r': - raise ValueError('popen()-emulation only supports read mode') - import tempfile - self.tmpfile = tmpfile = tempfile.mktemp() - os.system(cmd + ' > %s' % tmpfile) - self.pipe = open(tmpfile,'rb') - self.bufsize = bufsize - self.mode = mode - - def read(self): - - return self.pipe.read() - - def readlines(self): - - if self.bufsize is not None: - return self.pipe.readlines() - - def close(self, - - remove=os.unlink,error=os.error): - - if self.pipe: - rc = self.pipe.close() - else: - rc = 255 - if self.tmpfile: - try: - remove(self.tmpfile) - except error: - pass - return rc - - # Alias - __del__ = close - def popen(cmd, mode='r', bufsize=-1): """ Portable popen() interface. """ - # Find a working popen implementation preferring win32pipe.popen - # over os.popen over _popen - popen = None - if os.environ.get('OS','') == 'Windows_NT': - # On NT win32pipe should work; on Win9x it hangs due to bugs - # in the MS C lib (see MS KnowledgeBase article Q150956) - try: - import win32pipe - except ImportError: - pass - else: - popen = win32pipe.popen - if popen is None: - if hasattr(os,'popen'): - popen = os.popen - # Check whether it works... it doesn't in GUI programs - # on Windows platforms - if sys.platform == 'win32': # XXX Others too ? - try: - popen('') - except os.error: - popen = _popen - else: - popen = _popen - if bufsize is None: - return popen(cmd,mode) - else: - return popen(cmd,mode,bufsize) + return os.popen(cmd, mode, bufsize) def _norm_version(version, build=''): diff --git a/Lib/poplib.py b/Lib/poplib.py index 84ea88d..d42d9dd 100644 --- a/Lib/poplib.py +++ b/Lib/poplib.py @@ -250,15 +250,18 @@ class POP3: def quit(self): """Signoff: commit changes on server, unlock mailbox, close connection.""" - try: - resp = self._shortcmd('QUIT') - except error_proto as val: - resp = val - self.file.close() - self.sock.close() - del self.file, self.sock + resp = self._shortcmd('QUIT') + self.close() return resp + def close(self): + """Close the connection without assuming anything about it.""" + if self.file is not None: + self.file.close() + if self.sock is not None: + self.sock.close() + self.file = self.sock = None + #__del__ = quit diff --git a/Lib/pydoc.py b/Lib/pydoc.py index 9d3cdd5..9f8f120 100755 --- a/Lib/pydoc.py +++ b/Lib/pydoc.py @@ -168,11 +168,11 @@ def _split_list(s, predicate): def visiblename(name, all=None): """Decide whether to show documentation on a variable.""" # Certain special names are redundant. - _hidden_names = ('__builtins__', '__doc__', '__file__', '__path__', + if name in {'__builtins__', '__doc__', '__file__', '__path__', '__module__', '__name__', '__slots__', '__package__', '__cached__', '__author__', '__credits__', '__date__', - '__version__') - if name in _hidden_names: return 0 + '__version__'}: + return 0 # Private names are hidden, but special names are displayed. if name.startswith('__') and name.endswith('__'): return 1 if all is not None: diff --git a/Lib/random.py b/Lib/random.py index 7e0de4f..3d2af24 100644 --- a/Lib/random.py +++ b/Lib/random.py @@ -42,7 +42,7 @@ from types import MethodType as _MethodType, BuiltinMethodType as _BuiltinMethod from math import log as _log, exp as _exp, pi as _pi, e as _e, ceil as _ceil from math import sqrt as _sqrt, acos as _acos, cos as _cos, sin as _sin from os import urandom as _urandom -import collections as _collections +from collections.abc import Set as _Set, Sequence as _Sequence from hashlib import sha512 as _sha512 __all__ = ["Random","seed","random","uniform","randint","choice","sample", @@ -114,7 +114,7 @@ class Random(_random.Random): if version == 2: if isinstance(a, (str, bytes, bytearray)): if isinstance(a, str): - a = a.encode("utf8") + a = a.encode("utf-8") a += _sha512(a).digest() a = int.from_bytes(a, 'big') @@ -293,10 +293,10 @@ class Random(_random.Random): # preferred since the list takes less space than the # set and it doesn't suffer from frequent reselections. - if isinstance(population, _collections.Set): + if isinstance(population, _Set): population = tuple(population) - if not isinstance(population, _collections.Sequence): - raise TypeError("Population must be a sequence or Set. For dicts, use list(d).") + if not isinstance(population, _Sequence): + raise TypeError("Population must be a sequence or set. For dicts, use list(d).") randbelow = self._randbelow n = len(population) if not 0 <= k <= n: diff --git a/Lib/site.py b/Lib/site.py index a2c0bec..fcfdbed 100644 --- a/Lib/site.py +++ b/Lib/site.py @@ -508,6 +508,11 @@ def execusercustomize(): def main(): + """Add standard site-specific directories to the module search path. + + This function is called automatically when this module is imported, + unless the python interpreter was started with the -S flag. + """ global ENABLE_USER_SITE abs_paths() @@ -526,7 +531,10 @@ def main(): if ENABLE_USER_SITE: execusercustomize() -main() +# Prevent edition of sys.path when python was started with -S and +# site is imported later. +if not sys.flags.no_site: + main() def _script(): help = """\ diff --git a/Lib/smtpd.py b/Lib/smtpd.py index 599e79b..32f45ae 100755 --- a/Lib/smtpd.py +++ b/Lib/smtpd.py @@ -275,7 +275,7 @@ class SMTPChannel(asynchat.async_chat): return elif limit: self.num_bytes += len(data) - self.received_lines.append(str(data, "utf8")) + self.received_lines.append(str(data, "utf-8")) # Implementation of base class abstract method def found_terminator(self): diff --git a/Lib/smtplib.py b/Lib/smtplib.py index 14e6250..213138c 100755 --- a/Lib/smtplib.py +++ b/Lib/smtplib.py @@ -269,6 +269,19 @@ class SMTP: pass self.local_hostname = '[%s]' % addr + def __enter__(self): + return self + + def __exit__(self, *args): + try: + code, message = self.docmd("QUIT") + if code != 221: + raise SMTPResponseException(code, message) + except SMTPServerDisconnected: + pass + finally: + self.close() + def set_debuglevel(self, debuglevel): """Set the debug output level. diff --git a/Lib/socket.py b/Lib/socket.py index 1e28549..5715034 100644 --- a/Lib/socket.py +++ b/Lib/socket.py @@ -112,6 +112,9 @@ class socket(_socket.socket): s[7:]) return s + def __getstate__(self): + raise TypeError("Cannot serialize socket object") + def dup(self): """dup() -> socket object diff --git a/Lib/sqlite3/test/types.py b/Lib/sqlite3/test/types.py index 29413e1..d214f3d 100644 --- a/Lib/sqlite3/test/types.py +++ b/Lib/sqlite3/test/types.py @@ -85,7 +85,7 @@ class DeclTypesTests(unittest.TestCase): if isinstance(_val, bytes): # sqlite3 always calls __init__ with a bytes created from a # UTF-8 string when __conform__ was used to store the object. - _val = _val.decode('utf8') + _val = _val.decode('utf-8') self.val = _val def __cmp__(self, other): diff --git a/Lib/sre_parse.py b/Lib/sre_parse.py index 13737ca..ae63c31 100644 --- a/Lib/sre_parse.py +++ b/Lib/sre_parse.py @@ -791,7 +791,7 @@ def parse_template(source, pattern): else: # The tokenizer implicitly decodes bytes objects as latin-1, we must # therefore re-encode the final representation. - encode = lambda x: x.encode('latin1') + encode = lambda x: x.encode('latin-1') for c, s in p: if c is MARK: groupsappend((i, s)) diff --git a/Lib/string.py b/Lib/string.py index ef0334c..d4f9cd9 100644 --- a/Lib/string.py +++ b/Lib/string.py @@ -46,23 +46,7 @@ def capwords(s, sep=None): #################################################################### import re as _re - -class _multimap: - """Helper class for combining multiple mappings. - - Used by .{safe_,}substitute() to combine the mapping and keyword - arguments. - """ - def __init__(self, primary, secondary): - self._primary = primary - self._secondary = secondary - - def __getitem__(self, key): - try: - return self._primary[key] - except KeyError: - return self._secondary[key] - +from collections import ChainMap class _TemplateMetaclass(type): pattern = r""" @@ -116,7 +100,7 @@ class Template(metaclass=_TemplateMetaclass): if not args: mapping = kws elif kws: - mapping = _multimap(kws, args[0]) + mapping = ChainMap(kws, args[0]) else: mapping = args[0] # Helper function for .sub() @@ -142,7 +126,7 @@ class Template(metaclass=_TemplateMetaclass): if not args: mapping = kws elif kws: - mapping = _multimap(kws, args[0]) + mapping = ChainMap(kws, args[0]) else: mapping = args[0] # Helper function for .sub() diff --git a/Lib/subprocess.py b/Lib/subprocess.py index 039b3e6..4c66353 100644 --- a/Lib/subprocess.py +++ b/Lib/subprocess.py @@ -191,8 +191,10 @@ should prepare for OSErrors. A ValueError will be raised if Popen is called with invalid arguments. -check_call() and check_output() will raise CalledProcessError, if the -called process returns a non-zero return code. +Exceptions defined within this module inherit from SubprocessError. +check_call() and check_output() will raise CalledProcessError if the +called process returns a non-zero return code. TimeoutExpired +be raised if a timeout was specified and expired. Security @@ -340,6 +342,7 @@ mswindows = (sys.platform == "win32") import io import os +import time import traceback import gc import signal @@ -347,7 +350,10 @@ import builtins import warnings # Exception classes used by this module. -class CalledProcessError(Exception): +class SubprocessError(Exception): pass + + +class CalledProcessError(SubprocessError): """This exception is raised when a process run by check_call() or check_output() returns a non-zero exit status. The exit status will be stored in the returncode attribute; @@ -361,6 +367,20 @@ class CalledProcessError(Exception): return "Command '%s' returned non-zero exit status %d" % (self.cmd, self.returncode) +class TimeoutExpired(SubprocessError): + """This exception is raised when the timeout expires while waiting for a + child process. + """ + def __init__(self, cmd, timeout, output=None): + self.cmd = cmd + self.timeout = timeout + self.output = output + + def __str__(self): + return ("Command '%s' timed out after %s seconds" % + (self.cmd, self.timeout)) + + if mswindows: import threading import msvcrt @@ -412,7 +432,7 @@ else: return fds __all__ = ["Popen", "PIPE", "STDOUT", "call", "check_call", "getstatusoutput", - "getoutput", "check_output", "CalledProcessError"] + "getoutput", "check_output", "CalledProcessError", "DEVNULL"] if mswindows: from _subprocess import CREATE_NEW_CONSOLE, CREATE_NEW_PROCESS_GROUP @@ -437,6 +457,7 @@ def _cleanup(): PIPE = -1 STDOUT = -2 +DEVNULL = -3 def _eintr_retry_call(func, *args): @@ -449,15 +470,21 @@ def _eintr_retry_call(func, *args): raise -def call(*popenargs, **kwargs): - """Run command with arguments. Wait for command to complete, then - return the returncode attribute. +def call(*popenargs, timeout=None, **kwargs): + """Run command with arguments. Wait for command to complete or + timeout, then return the returncode attribute. The arguments are the same as for the Popen constructor. Example: retcode = call(["ls", "-l"]) """ - return Popen(*popenargs, **kwargs).wait() + p = Popen(*popenargs, **kwargs) + try: + return p.wait(timeout=timeout) + except TimeoutExpired: + p.kill() + p.wait() + raise def check_call(*popenargs, **kwargs): @@ -466,7 +493,7 @@ def check_call(*popenargs, **kwargs): CalledProcessError. The CalledProcessError object will have the return code in the returncode attribute. - The arguments are the same as for the Popen constructor. Example: + The arguments are the same as for the call function. Example: check_call(["ls", "-l"]) """ @@ -479,7 +506,7 @@ def check_call(*popenargs, **kwargs): return 0 -def check_output(*popenargs, **kwargs): +def check_output(*popenargs, timeout=None, **kwargs): r"""Run command with arguments and return its output as a byte string. If the exit code was non-zero it raises a CalledProcessError. The @@ -502,13 +529,15 @@ def check_output(*popenargs, **kwargs): if 'stdout' in kwargs: raise ValueError('stdout argument not allowed, it will be overridden.') process = Popen(*popenargs, stdout=PIPE, **kwargs) - output, unused_err = process.communicate() + try: + output, unused_err = process.communicate(timeout=timeout) + except TimeoutExpired: + process.kill() + output, unused_err = process.communicate() + raise TimeoutExpired(process.args, timeout, output=output) retcode = process.poll() if retcode: - cmd = kwargs.get("args") - if cmd is None: - cmd = popenargs[0] - raise CalledProcessError(retcode, cmd, output=output) + raise CalledProcessError(retcode, process.args, output=output) return output @@ -639,6 +668,8 @@ class Popen(object): _cleanup() self._child_created = False + self._input = None + self._communication_started = False if bufsize is None: bufsize = 0 # Restore default if not isinstance(bufsize, int): @@ -673,6 +704,7 @@ class Popen(object): raise ValueError("creationflags is only supported on Windows " "platforms") + self.args = args self.stdin = None self.stdout = None self.stderr = None @@ -768,8 +800,12 @@ class Popen(object): # Child is still running, keep us alive until we can wait on it. _active.append(self) + def _get_devnull(self): + if not hasattr(self, '_devnull'): + self._devnull = os.open(os.devnull, os.O_RDWR) + return self._devnull - def communicate(self, input=None): + def communicate(self, input=None, timeout=None): """Interact with process: Send data to stdin. Read data from stdout and stderr, until end-of-file is reached. Wait for process to terminate. The optional input argument should be a @@ -778,9 +814,19 @@ class Popen(object): communicate() returns a tuple (stdout, stderr).""" - # Optimization: If we are only using one pipe, or no pipe at - # all, using select() or threads is unnecessary. - if [self.stdin, self.stdout, self.stderr].count(None) >= 2: + if self._communication_started and input: + raise ValueError("Cannot send input after starting communication") + + if timeout is not None: + endtime = time.time() + timeout + else: + endtime = None + + # Optimization: If we are not worried about timeouts, we haven't + # started communicating, and we have one or zero pipes, using select() + # or threads is unnecessary. + if (endtime is None and not self._communication_started and + [self.stdin, self.stdout, self.stderr].count(None) >= 2): stdout = None stderr = None if self.stdin: @@ -796,13 +842,36 @@ class Popen(object): self.wait() return (stdout, stderr) - return self._communicate(input) + try: + stdout, stderr = self._communicate(input, endtime, timeout) + finally: + self._communication_started = True + + sts = self.wait(timeout=self._remaining_time(endtime)) + + return (stdout, stderr) def poll(self): return self._internal_poll() + def _remaining_time(self, endtime): + """Convenience for _communicate when computing timeouts.""" + if endtime is None: + return None + else: + return endtime - time.time() + + + def _check_timeout(self, endtime, orig_timeout): + """Convenience for checking if a timeout has expired.""" + if endtime is None: + return + if time.time() > endtime: + raise TimeoutExpired(self.args, orig_timeout) + + if mswindows: # # Windows methods @@ -824,6 +893,8 @@ class Popen(object): p2cread, _ = _subprocess.CreatePipe(None, 0) elif stdin == PIPE: p2cread, p2cwrite = _subprocess.CreatePipe(None, 0) + elif stdin == DEVNULL: + p2cread = msvcrt.get_osfhandle(self._get_devnull()) elif isinstance(stdin, int): p2cread = msvcrt.get_osfhandle(stdin) else: @@ -837,6 +908,8 @@ class Popen(object): _, c2pwrite = _subprocess.CreatePipe(None, 0) elif stdout == PIPE: c2pread, c2pwrite = _subprocess.CreatePipe(None, 0) + elif stdout == DEVNULL: + c2pwrite = msvcrt.get_osfhandle(self._get_devnull()) elif isinstance(stdout, int): c2pwrite = msvcrt.get_osfhandle(stdout) else: @@ -852,6 +925,8 @@ class Popen(object): errread, errwrite = _subprocess.CreatePipe(None, 0) elif stderr == STDOUT: errwrite = c2pwrite + elif stderr == DEVNULL: + errwrite = msvcrt.get_osfhandle(self._get_devnull()) elif isinstance(stderr, int): errwrite = msvcrt.get_osfhandle(stderr) else: @@ -961,6 +1036,8 @@ class Popen(object): c2pwrite.Close() if errwrite != -1: errwrite.Close() + if hasattr(self, '_devnull'): + os.close(self._devnull) # Retain the process handle, but close the thread handle self._child_created = True @@ -985,12 +1062,20 @@ class Popen(object): return self.returncode - def wait(self): + def wait(self, timeout=None, endtime=None): """Wait for child process to terminate. Returns returncode attribute.""" + if endtime is not None: + timeout = self._remaining_time(endtime) + if timeout is None: + timeout_millis = _subprocess.INFINITE + else: + timeout_millis = int(timeout * 1000) if self.returncode is None: - _subprocess.WaitForSingleObject(self._handle, - _subprocess.INFINITE) + result = _subprocess.WaitForSingleObject(self._handle, + timeout_millis) + if result == _subprocess.WAIT_TIMEOUT: + raise TimeoutExpired(self.args, timeout) self.returncode = _subprocess.GetExitCodeProcess(self._handle) return self.returncode @@ -1000,32 +1085,51 @@ class Popen(object): fh.close() - def _communicate(self, input): - stdout = None # Return - stderr = None # Return - - if self.stdout: - stdout = [] - stdout_thread = threading.Thread(target=self._readerthread, - args=(self.stdout, stdout)) - stdout_thread.daemon = True - stdout_thread.start() - if self.stderr: - stderr = [] - stderr_thread = threading.Thread(target=self._readerthread, - args=(self.stderr, stderr)) - stderr_thread.daemon = True - stderr_thread.start() + def _communicate(self, input, endtime, orig_timeout): + # Start reader threads feeding into a list hanging off of this + # object, unless they've already been started. + if self.stdout and not hasattr(self, "_stdout_buff"): + self._stdout_buff = [] + self.stdout_thread = \ + threading.Thread(target=self._readerthread, + args=(self.stdout, self._stdout_buff)) + self.stdout_thread.daemon = True + self.stdout_thread.start() + if self.stderr and not hasattr(self, "_stderr_buff"): + self._stderr_buff = [] + self.stderr_thread = \ + threading.Thread(target=self._readerthread, + args=(self.stderr, self._stderr_buff)) + self.stderr_thread.daemon = True + self.stderr_thread.start() if self.stdin: if input is not None: self.stdin.write(input) self.stdin.close() + # Wait for the reader threads, or time out. If we time out, the + # threads remain reading and the fds left open in case the user + # calls communicate again. + if self.stdout is not None: + self.stdout_thread.join(self._remaining_time(endtime)) + if self.stdout_thread.isAlive(): + raise TimeoutExpired(self.args, orig_timeout) + if self.stderr is not None: + self.stderr_thread.join(self._remaining_time(endtime)) + if self.stderr_thread.isAlive(): + raise TimeoutExpired(self.args, orig_timeout) + + # Collect the output from and close both pipes, now that we know + # both have been read successfully. + stdout = None + stderr = None if self.stdout: - stdout_thread.join() + stdout = self._stdout_buff + self.stdout.close() if self.stderr: - stderr_thread.join() + stderr = self._stderr_buff + self.stderr.close() # All data exchanged. Translate lists into strings. if stdout is not None: @@ -1033,7 +1137,6 @@ class Popen(object): if stderr is not None: stderr = stderr[0] - self.wait() return (stdout, stderr) def send_signal(self, sig): @@ -1071,6 +1174,8 @@ class Popen(object): pass elif stdin == PIPE: p2cread, p2cwrite = _create_pipe() + elif stdin == DEVNULL: + p2cread = self._get_devnull() elif isinstance(stdin, int): p2cread = stdin else: @@ -1081,6 +1186,8 @@ class Popen(object): pass elif stdout == PIPE: c2pread, c2pwrite = _create_pipe() + elif stdout == DEVNULL: + c2pwrite = self._get_devnull() elif isinstance(stdout, int): c2pwrite = stdout else: @@ -1093,6 +1200,8 @@ class Popen(object): errread, errwrite = _create_pipe() elif stderr == STDOUT: errwrite = c2pwrite + elif stderr == DEVNULL: + errwrite = self._get_devnull() elif isinstance(stderr, int): errwrite = stderr else: @@ -1123,7 +1232,7 @@ class Popen(object): restore_signals, start_new_session): """Execute program (POSIX version)""" - if isinstance(args, str): + if isinstance(args, (str, bytes)): args = [args] else: args = list(args) @@ -1286,6 +1395,8 @@ class Popen(object): os.close(c2pwrite) if errwrite != -1 and errread != -1: os.close(errwrite) + if hasattr(self, '_devnull'): + os.close(self._devnull) # Wait for exec to fail or succeed; possibly raising an # exception (limited in size) @@ -1363,25 +1474,57 @@ class Popen(object): return self.returncode - def wait(self): + def _try_wait(self, wait_flags): + try: + (pid, sts) = _eintr_retry_call(os.waitpid, self.pid, wait_flags) + except OSError as e: + if e.errno != errno.ECHILD: + raise + # This happens if SIGCLD is set to be ignored or waiting + # for child processes has otherwise been disabled for our + # process. This child is dead, we can't get the status. + pid = self.pid + sts = 0 + return (pid, sts) + + + def wait(self, timeout=None, endtime=None): """Wait for child process to terminate. Returns returncode attribute.""" - if self.returncode is None: - try: - pid, sts = _eintr_retry_call(os.waitpid, self.pid, 0) - except OSError as e: - if e.errno != errno.ECHILD: - raise - # This happens if SIGCLD is set to be ignored or waiting - # for child processes has otherwise been disabled for our - # process. This child is dead, we can't get the status. - sts = 0 + if self.returncode is not None: + return self.returncode + + # endtime is preferred to timeout. timeout is only used for + # printing. + if endtime is not None or timeout is not None: + if endtime is None: + endtime = time.time() + timeout + elif timeout is None: + timeout = self._remaining_time(endtime) + + if endtime is not None: + # Enter a busy loop if we have a timeout. This busy loop was + # cribbed from Lib/threading.py in Thread.wait() at r71065. + delay = 0.0005 # 500 us -> initial delay of 1 ms + while True: + (pid, sts) = self._try_wait(os.WNOHANG) + assert pid == self.pid or pid == 0 + if pid == self.pid: + self._handle_exitstatus(sts) + break + remaining = self._remaining_time(endtime) + if remaining <= 0: + raise TimeoutExpired(self.args, timeout) + delay = min(delay * 2, remaining, .05) + time.sleep(delay) + elif self.returncode is None: + (pid, sts) = self._try_wait(0) self._handle_exitstatus(sts) return self.returncode - def _communicate(self, input): - if self.stdin: + def _communicate(self, input, endtime, orig_timeout): + if self.stdin and not self._communication_started: # Flush stdio buffer. This might block, if the user has # been writing to .stdin in an uncontrolled fashion. self.stdin.flush() @@ -1389,9 +1532,13 @@ class Popen(object): self.stdin.close() if _has_poll: - stdout, stderr = self._communicate_with_poll(input) + stdout, stderr = self._communicate_with_poll(input, endtime, + orig_timeout) else: - stdout, stderr = self._communicate_with_select(input) + stdout, stderr = self._communicate_with_select(input, endtime, + orig_timeout) + + self.wait(timeout=self._remaining_time(endtime)) # All data exchanged. Translate lists into strings. if stdout is not None: @@ -1409,60 +1556,77 @@ class Popen(object): stderr = self._translate_newlines(stderr, self.stderr.encoding) - self.wait() return (stdout, stderr) - def _communicate_with_poll(self, input): + def _communicate_with_poll(self, input, endtime, orig_timeout): stdout = None # Return stderr = None # Return - fd2file = {} - fd2output = {} + + if not self._communication_started: + self._fd2file = {} poller = select.poll() def register_and_append(file_obj, eventmask): poller.register(file_obj.fileno(), eventmask) - fd2file[file_obj.fileno()] = file_obj + self._fd2file[file_obj.fileno()] = file_obj def close_unregister_and_remove(fd): poller.unregister(fd) - fd2file[fd].close() - fd2file.pop(fd) + self._fd2file[fd].close() + self._fd2file.pop(fd) if self.stdin and input: register_and_append(self.stdin, select.POLLOUT) + # Only create this mapping if we haven't already. + if not self._communication_started: + self._fd2output = {} + if self.stdout: + self._fd2output[self.stdout.fileno()] = [] + if self.stderr: + self._fd2output[self.stderr.fileno()] = [] + select_POLLIN_POLLPRI = select.POLLIN | select.POLLPRI if self.stdout: register_and_append(self.stdout, select_POLLIN_POLLPRI) - fd2output[self.stdout.fileno()] = stdout = [] + stdout = self._fd2output[self.stdout.fileno()] if self.stderr: register_and_append(self.stderr, select_POLLIN_POLLPRI) - fd2output[self.stderr.fileno()] = stderr = [] + stderr = self._fd2output[self.stderr.fileno()] - input_offset = 0 - while fd2file: + # Save the input here so that if we time out while communicating, + # we can continue sending input if we retry. + if self.stdin and self._input is None: + self._input_offset = 0 + self._input = input + if self.universal_newlines: + self._input = self._input.encode(self.stdin.encoding) + + while self._fd2file: try: - ready = poller.poll() + ready = poller.poll(self._remaining_time(endtime)) except select.error as e: if e.args[0] == errno.EINTR: continue raise + self._check_timeout(endtime, orig_timeout) # XXX Rewrite these to use non-blocking I/O on the # file objects; they are no longer using C stdio! for fd, mode in ready: if mode & select.POLLOUT: - chunk = input[input_offset : input_offset + _PIPE_BUF] - input_offset += os.write(fd, chunk) - if input_offset >= len(input): + chunk = self._input[self._input_offset : + self._input_offset + _PIPE_BUF] + self._input_offset += os.write(fd, chunk) + if self._input_offset >= len(self._input): close_unregister_and_remove(fd) elif mode & select_POLLIN_POLLPRI: data = os.read(fd, 4096) if not data: close_unregister_and_remove(fd) - fd2output[fd].append(data) + self._fd2output[fd].append(data) else: # Ignore hang up or errors. close_unregister_and_remove(fd) @@ -1470,53 +1634,76 @@ class Popen(object): return (stdout, stderr) - def _communicate_with_select(self, input): - read_set = [] - write_set = [] + def _communicate_with_select(self, input, endtime, orig_timeout): + if not self._communication_started: + self._read_set = [] + self._write_set = [] + if self.stdin and input: + self._write_set.append(self.stdin) + if self.stdout: + self._read_set.append(self.stdout) + if self.stderr: + self._read_set.append(self.stderr) + + if self.stdin and self._input is None: + self._input_offset = 0 + self._input = input + if self.universal_newlines: + self._input = self._input.encode(self.stdin.encoding) + stdout = None # Return stderr = None # Return - if self.stdin and input: - write_set.append(self.stdin) if self.stdout: - read_set.append(self.stdout) - stdout = [] + if not self._communication_started: + self._stdout_buff = [] + stdout = self._stdout_buff if self.stderr: - read_set.append(self.stderr) - stderr = [] + if not self._communication_started: + self._stderr_buff = [] + stderr = self._stderr_buff - input_offset = 0 - while read_set or write_set: + while self._read_set or self._write_set: try: - rlist, wlist, xlist = select.select(read_set, write_set, []) + (rlist, wlist, xlist) = \ + select.select(self._read_set, self._write_set, [], + self._remaining_time(endtime)) except select.error as e: if e.args[0] == errno.EINTR: continue raise + # According to the docs, returning three empty lists indicates + # that the timeout expired. + if not (rlist or wlist or xlist): + raise TimeoutExpired(self.args, orig_timeout) + # We also check what time it is ourselves for good measure. + self._check_timeout(endtime, orig_timeout) + # XXX Rewrite these to use non-blocking I/O on the # file objects; they are no longer using C stdio! if self.stdin in wlist: - chunk = input[input_offset : input_offset + _PIPE_BUF] + chunk = self._input[self._input_offset : + self._input_offset + _PIPE_BUF] bytes_written = os.write(self.stdin.fileno(), chunk) - input_offset += bytes_written - if input_offset >= len(input): + self._input_offset += bytes_written + if self._input_offset >= len(self._input): self.stdin.close() - write_set.remove(self.stdin) + self._write_set.remove(self.stdin) if self.stdout in rlist: data = os.read(self.stdout.fileno(), 1024) if not data: self.stdout.close() - read_set.remove(self.stdout) + self._read_set.remove(self.stdout) stdout.append(data) if self.stderr in rlist: data = os.read(self.stderr.fileno(), 1024) if not data: self.stderr.close() - read_set.remove(self.stderr) + self._read_set.remove(self.stderr) stderr.append(data) return (stdout, stderr) diff --git a/Lib/tarfile.py b/Lib/tarfile.py index 0f9d1da..6b663f4 100644 --- a/Lib/tarfile.py +++ b/Lib/tarfile.py @@ -1084,7 +1084,7 @@ class TarInfo(object): def create_pax_global_header(cls, pax_headers): """Return the object as a pax global header block sequence. """ - return cls._create_pax_generic_header(pax_headers, XGLTYPE, "utf8") + return cls._create_pax_generic_header(pax_headers, XGLTYPE, "utf-8") def _posix_split_name(self, name): """Split a name longer than 100 chars into a prefix @@ -1167,7 +1167,7 @@ class TarInfo(object): binary = False for keyword, value in pax_headers.items(): try: - value.encode("utf8", "strict") + value.encode("utf-8", "strict") except UnicodeEncodeError: binary = True break @@ -1178,13 +1178,13 @@ class TarInfo(object): records += b"21 hdrcharset=BINARY\n" for keyword, value in pax_headers.items(): - keyword = keyword.encode("utf8") + keyword = keyword.encode("utf-8") if binary: # Try to restore the original byte representation of `value'. # Needless to say, that the encoding must match the string. value = value.encode(encoding, "surrogateescape") else: - value = value.encode("utf8") + value = value.encode("utf-8") l = len(keyword) + len(value) + 3 # ' ' + '=' + '\n' n = p = 0 @@ -1393,7 +1393,7 @@ class TarInfo(object): # the translation to UTF-8 fails. match = re.search(br"\d+ hdrcharset=([^\n]+)\n", buf) if match is not None: - pax_headers["hdrcharset"] = match.group(1).decode("utf8") + pax_headers["hdrcharset"] = match.group(1).decode("utf-8") # For the time being, we don't care about anything other than "BINARY". # The only other value that is currently allowed by the standard is @@ -1402,7 +1402,7 @@ class TarInfo(object): if hdrcharset == "BINARY": encoding = tarfile.encoding else: - encoding = "utf8" + encoding = "utf-8" # Parse pax header information. A record looks like that: # "%d %s=%s\n" % (length, keyword, value). length is the size @@ -1419,20 +1419,20 @@ class TarInfo(object): length = int(length) value = buf[match.end(2) + 1:match.start(1) + length - 1] - # Normally, we could just use "utf8" as the encoding and "strict" + # Normally, we could just use "utf-8" as the encoding and "strict" # as the error handler, but we better not take the risk. For # example, GNU tar <= 1.23 is known to store filenames it cannot # translate to UTF-8 as raw strings (unfortunately without a # hdrcharset=BINARY header). # We first try the strict standard encoding, and if that fails we # fall back on the user's encoding and error handler. - keyword = self._decode_pax_field(keyword, "utf8", "utf8", + keyword = self._decode_pax_field(keyword, "utf-8", "utf-8", tarfile.errors) if keyword in PAX_NAME_FIELDS: value = self._decode_pax_field(value, encoding, tarfile.encoding, tarfile.errors) else: - value = self._decode_pax_field(value, "utf8", "utf8", + value = self._decode_pax_field(value, "utf-8", "utf-8", tarfile.errors) pax_headers[keyword] = value diff --git a/Lib/test/crashers/README b/Lib/test/crashers/README index 2a73e1b..0259a06 100644 --- a/Lib/test/crashers/README +++ b/Lib/test/crashers/README @@ -14,3 +14,7 @@ note if the cause is system or environment dependent and what the variables are. Once the crash is fixed, the test case should be moved into an appropriate test (even if it was originally from the test suite). This ensures the regression doesn't happen again. And if it does, it should be easier to track down. + +Also see Lib/test_crashers.py which exercises the crashers in this directory. +In particular, make sure to add any new infinite loop crashers to the black +list so it doesn't try to run them. diff --git a/Lib/test/crashers/compiler_recursion.py b/Lib/test/crashers/compiler_recursion.py index 4954bdd..31f28a9 100644 --- a/Lib/test/crashers/compiler_recursion.py +++ b/Lib/test/crashers/compiler_recursion.py @@ -1,5 +1,13 @@ """ -The compiler (>= 2.5) recurses happily. +The compiler (>= 2.5) recurses happily until it blows the stack. + +Recorded on the tracker as http://bugs.python.org/issue11383 """ -compile('()'*9**5, '?', 'exec') +# The variant below blows up in compiler_call, but there are assorted +# other variations that blow up in other functions +# e.g. '1*'*10**5+'1' will die in compiler_visit_expr + +# The exact limit to destroy the stack will vary by platform +# but 10M should do the trick even with huge stack allocations +compile('()'*10**7, '?', 'exec') diff --git a/Lib/test/list_tests.py b/Lib/test/list_tests.py index e3a7845..0c656fd 100644 --- a/Lib/test/list_tests.py +++ b/Lib/test/list_tests.py @@ -425,6 +425,47 @@ class CommonTest(seq_tests.CommonTest): self.assertRaises(TypeError, u.reverse, 42) + def test_clear(self): + u = self.type2test([2, 3, 4]) + u.clear() + self.assertEqual(u, []) + + u = self.type2test([]) + u.clear() + self.assertEqual(u, []) + + u = self.type2test([]) + u.append(1) + u.clear() + u.append(2) + self.assertEqual(u, [2]) + + self.assertRaises(TypeError, u.clear, None) + + def test_copy(self): + u = self.type2test([1, 2, 3]) + v = u.copy() + self.assertEqual(v, [1, 2, 3]) + + u = self.type2test([]) + v = u.copy() + self.assertEqual(v, []) + + # test that it's indeed a copy and not a reference + u = self.type2test(['a', 'b']) + v = u.copy() + v.append('i') + self.assertEqual(u, ['a', 'b']) + self.assertEqual(v, u + ['i']) + + # test that it's a shallow, not a deep copy + u = self.type2test([1, 2, [3, 4], 5]) + v = u.copy() + self.assertEqual(u, v) + self.assertIs(v[3], u[3]) + + self.assertRaises(TypeError, u.copy, None) + def test_sort(self): u = self.type2test([1, 0]) u.sort() diff --git a/Lib/test/pickletester.py b/Lib/test/pickletester.py index a843486..45e8f74 100644 --- a/Lib/test/pickletester.py +++ b/Lib/test/pickletester.py @@ -3,9 +3,10 @@ import unittest import pickle import pickletools import copyreg +import weakref from http.cookies import SimpleCookie -from test.support import TestFailed, TESTFN, run_with_locale +from test.support import TestFailed, TESTFN, run_with_locale, no_tracing from pickle import bytes_types @@ -842,6 +843,25 @@ class AbstractPickleTests(unittest.TestCase): self.assertEqual(B(x), B(y), detail) self.assertEqual(x.__dict__, y.__dict__, detail) + def test_newobj_proxies(self): + # NEWOBJ should use the __class__ rather than the raw type + classes = myclasses[:] + # Cannot create weakproxies to these classes + for c in (MyInt, MyTuple): + classes.remove(c) + for proto in protocols: + for C in classes: + B = C.__base__ + x = C(C.sample) + x.foo = 42 + p = weakref.proxy(x) + s = self.dumps(p, proto) + y = self.loads(s) + self.assertEqual(type(y), type(x)) # rather than type(p) + detail = (proto, C, B, x, y, type(y)) + self.assertEqual(B(x), B(y), detail) + self.assertEqual(x.__dict__, y.__dict__, detail) + # Register a type with copyreg, with extension code extcode. Pickle # an object of that type. Check that the resulting pickle uses opcode # (EXT[124]) under proto 2, and not in proto 1. @@ -1002,13 +1022,13 @@ class AbstractPickleTests(unittest.TestCase): y = self.loads(s) self.assertEqual(y._reduce_called, 1) + @no_tracing def test_bad_getattr(self): x = BadGetattr() for proto in 0, 1: self.assertRaises(RuntimeError, self.dumps, x, proto) # protocol 2 don't raise a RuntimeError. d = self.dumps(x, 2) - self.assertRaises(RuntimeError, self.loads, d) def test_reduce_bad_iterator(self): # Issue4176: crash when 4th and 5th items of __reduce__() diff --git a/Lib/test/regrtest.py b/Lib/test/regrtest.py index 238f276..a00f15a 100755 --- a/Lib/test/regrtest.py +++ b/Lib/test/regrtest.py @@ -374,6 +374,13 @@ def main(tests=None, testdir=None, verbose=0, quiet=False, forever = True elif o in ('-j', '--multiprocess'): use_mp = int(a) + if use_mp <= 0: + try: + import multiprocessing + # Use all cores + extras for tests that like to sleep + use_mp = 2 + multiprocessing.cpu_count() + except (ImportError, NotImplementedError): + use_mp = 3 elif o == '--header': header = True elif o == '--slaveargs': @@ -732,9 +739,9 @@ def findtests(testdir=None, stdtests=STDTESTS, nottests=NOTTESTS): tests = [] others = set(stdtests) | nottests for name in names: - modname, ext = os.path.splitext(name) - if modname[:5] == "test_" and ext == ".py" and modname not in others: - tests.append(modname) + mod, ext = os.path.splitext(name) + if mod[:5] == "test_" and ext in (".py", "") and mod not in others: + tests.append(mod) return stdtests + sorted(tests) def replace_stdout(): @@ -827,7 +834,7 @@ class saved_test_environment: resources = ('sys.argv', 'cwd', 'sys.stdin', 'sys.stdout', 'sys.stderr', 'os.environ', 'sys.path', 'sys.path_hooks', '__import__', 'warnings.filters', 'asyncore.socket_map', - 'logging._handlers', 'logging._handlerList', + 'logging._handlers', 'logging._handlerList', 'sys.gettrace', 'sys.warnoptions') def get_sys_argv(self): @@ -875,6 +882,11 @@ class saved_test_environment: sys.path_hooks = saved_hooks[1] sys.path_hooks[:] = saved_hooks[2] + def get_sys_gettrace(self): + return sys.gettrace() + def restore_sys_gettrace(self, trace_fxn): + sys.settrace(trace_fxn) + def get___import__(self): return builtins.__import__ def restore___import__(self, import_): @@ -1055,7 +1067,8 @@ def dash_R(the_module, test, indirect_test, huntrleaks): False if the test didn't leak references; True if we detected refleaks. """ # This code is hackish and inelegant, but it seems to do the job. - import copyreg, _abcoll + import copyreg + import collections.abc if not hasattr(sys, 'gettotalrefcount'): raise Exception("Tracking reference leaks requires a debug build " @@ -1072,7 +1085,7 @@ def dash_R(the_module, test, indirect_test, huntrleaks): else: zdc = zipimport._zip_directory_cache.copy() abcs = {} - for abc in [getattr(_abcoll, a) for a in _abcoll.__all__]: + for abc in [getattr(collections.abc, a) for a in collections.abc.__all__]: if not isabstract(abc): continue for obj in abc.__subclasses__() + [abc]: @@ -1118,7 +1131,7 @@ def dash_R_cleanup(fs, ps, pic, zdc, abcs): import gc, copyreg import _strptime, linecache import urllib.parse, urllib.request, mimetypes, doctest - import struct, filecmp, _abcoll + import struct, filecmp, collections.abc from distutils.dir_util import _path_created from weakref import WeakSet @@ -1145,7 +1158,7 @@ def dash_R_cleanup(fs, ps, pic, zdc, abcs): sys._clear_type_cache() # Clear ABC registries, restoring previously saved ABC registries. - for abc in [getattr(_abcoll, a) for a in _abcoll.__all__]: + for abc in [getattr(collections.abc, a) for a in collections.abc.__all__]: if not isabstract(abc): continue for obj in abc.__subclasses__() + [abc]: diff --git a/Lib/test/support.py b/Lib/test/support.py index 8d8e187..52ec232 100644 --- a/Lib/test/support.py +++ b/Lib/test/support.py @@ -15,7 +15,7 @@ import shutil import warnings import unittest import importlib -import collections +import collections.abc import re import subprocess import imp @@ -33,7 +33,7 @@ __all__ = [ "verbose", "use_resources", "max_memuse", "record_original_stdout", "get_original_stdout", "unload", "unlink", "rmtree", "forget", "is_resource_enabled", "requires", "find_unused_port", "bind_port", - "fcmp", "is_jython", "TESTFN", "HOST", "FUZZ", "SAVEDCWD", "temp_cwd", + "is_jython", "TESTFN", "HOST", "SAVEDCWD", "temp_cwd", "findfile", "sortdict", "check_syntax_error", "open_urlresource", "check_warnings", "CleanImport", "EnvironmentVarGuard", "TransientResource", "captured_output", "captured_stdout", @@ -381,24 +381,6 @@ def bind_port(sock, host=HOST): port = sock.getsockname()[1] return port -FUZZ = 1e-6 - -def fcmp(x, y): # fuzzy comparison function - if isinstance(x, float) or isinstance(y, float): - try: - fuzz = (abs(x) + abs(y)) * FUZZ - if abs(x-y) <= fuzz: - return 0 - except: - pass - elif type(x) == type(y) and isinstance(x, (tuple, list)): - for i in range(min(len(x), len(y))): - outcome = fcmp(x[i], y[i]) - if outcome != 0: - return outcome - return (len(x) > len(y)) - (len(x) < len(y)) - return (x > y) - (x < y) - # decorator for skipping tests on non-IEEE 754 platforms requires_IEEE_754 = unittest.skipUnless( float.__getformat__("double").startswith("IEEE"), @@ -714,7 +696,7 @@ class CleanImport(object): sys.modules.update(self.original_modules) -class EnvironmentVarGuard(collections.MutableMapping): +class EnvironmentVarGuard(collections.abc.MutableMapping): """Class to help protect the environment variable properly. Can be used as a context manager.""" @@ -1142,6 +1124,32 @@ def check_impl_detail(**guards): return guards.get(platform.python_implementation().lower(), default) +def no_tracing(func): + """Decorator to temporarily turn off tracing for the duration of a test.""" + if not hasattr(sys, 'gettrace'): + return func + else: + @functools.wraps(func) + def wrapper(*args, **kwargs): + original_trace = sys.gettrace() + try: + sys.settrace(None) + return func(*args, **kwargs) + finally: + sys.settrace(original_trace) + return wrapper + + +def refcount_test(test): + """Decorator for tests which involve reference counting. + + To start, the decorator does not run the test if is not run by CPython. + After that, any trace function is unset during the test to prevent + unexpected refcounts caused by the trace function. + + """ + return no_tracing(cpython_only(test)) + def _run_suite(suite): """Run tests from a unittest.TestSuite-derived class.""" @@ -1366,7 +1374,7 @@ def strip_python_stderr(stderr): def args_from_interpreter_flags(): """Return a list of command-line arguments reproducing the current - settings in sys.flags.""" + settings in sys.flags and sys.warnoptions.""" flag_opt_map = { 'bytes_warning': 'b', 'dont_write_bytecode': 'B', @@ -1381,6 +1389,8 @@ def args_from_interpreter_flags(): v = getattr(sys.flags, flag) if v > 0: args.append('-' + opt * v) + for opt in sys.warnoptions: + args.append('-W' + opt) return args #============================================================ @@ -1454,9 +1464,11 @@ def can_symlink(): global _can_symlink if _can_symlink is not None: return _can_symlink + symlink_path = TESTFN + "can_symlink" try: - os.symlink(TESTFN, TESTFN + "can_symlink") + os.symlink(TESTFN, symlink_path) can = True + os.remove(symlink_path) except (OSError, NotImplementedError): can = False _can_symlink = can diff --git a/Lib/test/test_abc.py b/Lib/test/test_abc.py index d86f97c..1319a64 100644 --- a/Lib/test/test_abc.py +++ b/Lib/test/test_abc.py @@ -121,11 +121,32 @@ class TestABC(unittest.TestCase): self.assertFalse(issubclass(B, (A,))) self.assertNotIsInstance(b, A) self.assertNotIsInstance(b, (A,)) - A.register(B) + B1 = A.register(B) + self.assertTrue(issubclass(B, A)) + self.assertTrue(issubclass(B, (A,))) + self.assertIsInstance(b, A) + self.assertIsInstance(b, (A,)) + self.assertIs(B1, B) + class C(B): + pass + c = C() + self.assertTrue(issubclass(C, A)) + self.assertTrue(issubclass(C, (A,))) + self.assertIsInstance(c, A) + self.assertIsInstance(c, (A,)) + + def test_register_as_class_deco(self): + class A(metaclass=abc.ABCMeta): + pass + @A.register + class B(object): + pass + b = B() self.assertTrue(issubclass(B, A)) self.assertTrue(issubclass(B, (A,))) self.assertIsInstance(b, A) self.assertIsInstance(b, (A,)) + @A.register class C(B): pass c = C() @@ -133,6 +154,7 @@ class TestABC(unittest.TestCase): self.assertTrue(issubclass(C, (A,))) self.assertIsInstance(c, A) self.assertIsInstance(c, (A,)) + self.assertIs(C, A.register(C)) def test_isinstance_invalidation(self): class A(metaclass=abc.ABCMeta): diff --git a/Lib/test/test_argparse.py b/Lib/test/test_argparse.py index 03c95fa..8d80336 100644 --- a/Lib/test/test_argparse.py +++ b/Lib/test/test_argparse.py @@ -4328,7 +4328,7 @@ class TestEncoding(TestCase): def _test_module_encoding(self, path): path, _ = os.path.splitext(path) path += ".py" - with codecs.open(path, 'r', 'utf8') as f: + with codecs.open(path, 'r', 'utf-8') as f: f.read() def test_argparse_module_encoding(self): diff --git a/Lib/test/test_asyncore.py b/Lib/test/test_asyncore.py index 53c49a8..389dc8a 100644 --- a/Lib/test/test_asyncore.py +++ b/Lib/test/test_asyncore.py @@ -352,7 +352,7 @@ class DispatcherWithSendTests(unittest.TestCase): @support.reap_threads def test_send(self): evt = threading.Event() - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock = socket.socket() sock.settimeout(3) port = support.bind_port(sock) @@ -367,7 +367,7 @@ class DispatcherWithSendTests(unittest.TestCase): data = b"Suppose there isn't a 16-ton weight?" d = dispatcherwithsend_noread() - d.create_socket(socket.AF_INET, socket.SOCK_STREAM) + d.create_socket() d.connect((HOST, port)) # give time for socket to connect @@ -474,7 +474,7 @@ class TCPServer(asyncore.dispatcher): def __init__(self, handler=BaseTestHandler, host=HOST, port=0): asyncore.dispatcher.__init__(self) - self.create_socket(socket.AF_INET, socket.SOCK_STREAM) + self.create_socket() self.set_reuse_addr() self.bind((host, port)) self.listen(5) @@ -495,7 +495,7 @@ class BaseClient(BaseTestHandler): def __init__(self, address): BaseTestHandler.__init__(self) - self.create_socket(socket.AF_INET, socket.SOCK_STREAM) + self.create_socket() self.connect(address) def handle_connect(self): @@ -536,7 +536,7 @@ class BaseTestAPI(unittest.TestCase): def __init__(self): BaseTestHandler.__init__(self) - self.create_socket(socket.AF_INET, socket.SOCK_STREAM) + self.create_socket() self.bind((HOST, 0)) self.listen(5) self.address = self.socket.getsockname()[:2] @@ -555,7 +555,7 @@ class BaseTestAPI(unittest.TestCase): def __init__(self): BaseTestHandler.__init__(self) - self.create_socket(socket.AF_INET, socket.SOCK_STREAM) + self.create_socket() self.bind((HOST, 0)) self.listen(5) self.address = self.socket.getsockname()[:2] @@ -693,20 +693,20 @@ class BaseTestAPI(unittest.TestCase): def test_create_socket(self): s = asyncore.dispatcher() - s.create_socket(socket.AF_INET, socket.SOCK_STREAM) + s.create_socket() self.assertEqual(s.socket.family, socket.AF_INET) SOCK_NONBLOCK = getattr(socket, 'SOCK_NONBLOCK', 0) self.assertEqual(s.socket.type, socket.SOCK_STREAM | SOCK_NONBLOCK) def test_bind(self): s1 = asyncore.dispatcher() - s1.create_socket(socket.AF_INET, socket.SOCK_STREAM) + s1.create_socket() s1.bind((HOST, 0)) s1.listen(5) port = s1.socket.getsockname()[1] s2 = asyncore.dispatcher() - s2.create_socket(socket.AF_INET, socket.SOCK_STREAM) + s2.create_socket() # EADDRINUSE indicates the socket was correctly bound self.assertRaises(socket.error, s2.bind, (HOST, port)) @@ -723,7 +723,7 @@ class BaseTestAPI(unittest.TestCase): self.assertFalse(s.socket.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR)) s.socket.close() - s.create_socket(socket.AF_INET, socket.SOCK_STREAM) + s.create_socket() s.set_reuse_addr() self.assertTrue(s.socket.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR)) diff --git a/Lib/test/test_bigmem.py b/Lib/test/test_bigmem.py index ac6b109..f9a0a3d 100644 --- a/Lib/test/test_bigmem.py +++ b/Lib/test/test_bigmem.py @@ -707,7 +707,7 @@ class StrTest(unittest.TestCase, BaseStrTest): class BytesTest(unittest.TestCase, BaseStrTest): def from_latin1(self, s): - return s.encode("latin1") + return s.encode("latin-1") @bigmemtest(minsize=_2G + 2, memuse=1 + character_size) def test_decode(self, size): @@ -718,7 +718,7 @@ class BytesTest(unittest.TestCase, BaseStrTest): class BytearrayTest(unittest.TestCase, BaseStrTest): def from_latin1(self, s): - return bytearray(s.encode("latin1")) + return bytearray(s.encode("latin-1")) @bigmemtest(minsize=_2G + 2, memuse=1 + character_size) def test_decode(self, size): diff --git a/Lib/test/test_builtin.py b/Lib/test/test_builtin.py index 1469e36..b094a65 100644 --- a/Lib/test/test_builtin.py +++ b/Lib/test/test_builtin.py @@ -10,7 +10,7 @@ import ast import types import builtins import random -from test.support import fcmp, TESTFN, unlink, run_unittest, check_warnings +from test.support import TESTFN, unlink, run_unittest, check_warnings from operator import neg @@ -394,10 +394,13 @@ class BuiltinTest(unittest.TestCase): self.assertEqual(divmod(-sys.maxsize-1, -1), (sys.maxsize+1, 0)) - self.assertTrue(not fcmp(divmod(3.25, 1.0), (3.0, 0.25))) - self.assertTrue(not fcmp(divmod(-3.25, 1.0), (-4.0, 0.75))) - self.assertTrue(not fcmp(divmod(3.25, -1.0), (-4.0, -0.75))) - self.assertTrue(not fcmp(divmod(-3.25, -1.0), (3.0, -0.25))) + for num, denom, exp_result in [ (3.25, 1.0, (3.0, 0.25)), + (-3.25, 1.0, (-4.0, 0.75)), + (3.25, -1.0, (-4.0, -0.75)), + (-3.25, -1.0, (3.0, -0.25))]: + result = divmod(num, denom) + self.assertAlmostEqual(result[0], exp_result[0]) + self.assertAlmostEqual(result[1], exp_result[1]) self.assertRaises(TypeError, divmod) @@ -1276,14 +1279,14 @@ class BuiltinTest(unittest.TestCase): # -------------------------------------------------------------------- # Issue #7994: object.__format__ with a non-empty format string is - # pending deprecated + # deprecated def test_deprecated_format_string(obj, fmt_str, should_raise_warning): with warnings.catch_warnings(record=True) as w: - warnings.simplefilter("always", PendingDeprecationWarning) + warnings.simplefilter("always", DeprecationWarning) format(obj, fmt_str) if should_raise_warning: self.assertEqual(len(w), 1) - self.assertIsInstance(w[0].message, PendingDeprecationWarning) + self.assertIsInstance(w[0].message, DeprecationWarning) self.assertIn('object.__format__ with a non-empty format ' 'string', str(w[0].message)) else: diff --git a/Lib/test/test_bytes.py b/Lib/test/test_bytes.py index 9be1008..0b70c3a 100644 --- a/Lib/test/test_bytes.py +++ b/Lib/test/test_bytes.py @@ -188,24 +188,26 @@ class BaseBytesTest(unittest.TestCase): def test_encoding(self): sample = "Hello world\n\u1234\u5678\u9abc" - for enc in ("utf8", "utf16"): + for enc in ("utf-8", "utf-16"): b = self.type2test(sample, enc) self.assertEqual(b, self.type2test(sample.encode(enc))) - self.assertRaises(UnicodeEncodeError, self.type2test, sample, "latin1") - b = self.type2test(sample, "latin1", "ignore") + self.assertRaises(UnicodeEncodeError, self.type2test, sample, "latin-1") + b = self.type2test(sample, "latin-1", "ignore") self.assertEqual(b, self.type2test(sample[:-3], "utf-8")) def test_decode(self): sample = "Hello world\n\u1234\u5678\u9abc\def0\def0" - for enc in ("utf8", "utf16"): + for enc in ("utf-8", "utf-16"): b = self.type2test(sample, enc) self.assertEqual(b.decode(enc), sample) sample = "Hello world\n\x80\x81\xfe\xff" - b = self.type2test(sample, "latin1") - self.assertRaises(UnicodeDecodeError, b.decode, "utf8") - self.assertEqual(b.decode("utf8", "ignore"), "Hello world\n") - self.assertEqual(b.decode(errors="ignore", encoding="utf8"), + b = self.type2test(sample, "latin-1") + self.assertRaises(UnicodeDecodeError, b.decode, "utf-8") + self.assertEqual(b.decode("utf-8", "ignore"), "Hello world\n") + self.assertEqual(b.decode(errors="ignore", encoding="utf-8"), "Hello world\n") + # Default encoding is utf-8 + self.assertEqual(self.type2test(b'\xe2\x98\x83').decode(), '\u2603') def test_from_int(self): b = self.type2test(0) @@ -562,6 +564,39 @@ class ByteArrayTest(BaseBytesTest): b.reverse() self.assertFalse(b) + def test_clear(self): + b = bytearray(b'python') + b.clear() + self.assertEqual(b, b'') + + b = bytearray(b'') + b.clear() + self.assertEqual(b, b'') + + b = bytearray(b'') + b.append(ord('r')) + b.clear() + b.append(ord('p')) + self.assertEqual(b, b'p') + + def test_copy(self): + b = bytearray(b'abc') + bb = b.copy() + self.assertEqual(bb, b'abc') + + b = bytearray(b'') + bb = b.copy() + self.assertEqual(bb, b'') + + # test that it's indeed a copy and not a reference + b = bytearray(b'abc') + bb = b.copy() + self.assertEqual(b, bb) + self.assertIsNot(b, bb) + bb.append(ord('d')) + self.assertEqual(bb, b'abcd') + self.assertEqual(b, b'abc') + def test_regexps(self): def by(s): return bytearray(map(ord, s)) diff --git a/Lib/test/test_cmd_line.py b/Lib/test/test_cmd_line.py index c4e3adf..a0a85ae 100644 --- a/Lib/test/test_cmd_line.py +++ b/Lib/test/test_cmd_line.py @@ -151,7 +151,7 @@ class CmdLineTest(unittest.TestCase): @unittest.skipUnless(sys.platform == 'darwin', 'test specific to Mac OS X') def test_osx_utf8(self): def check_output(text): - decoded = text.decode('utf8', 'surrogateescape') + decoded = text.decode('utf-8', 'surrogateescape') expected = ascii(decoded).encode('ascii') + b'\n' env = os.environ.copy() @@ -223,7 +223,7 @@ class CmdLineTest(unittest.TestCase): self.assertIn(path2.encode('ascii'), out) def test_displayhook_unencodable(self): - for encoding in ('ascii', 'latin1', 'utf8'): + for encoding in ('ascii', 'latin-1', 'utf-8'): env = os.environ.copy() env['PYTHONIOENCODING'] = encoding p = subprocess.Popen( diff --git a/Lib/test/test_codecs.py b/Lib/test/test_codecs.py index d560d7a..67a5aed 100644 --- a/Lib/test/test_codecs.py +++ b/Lib/test/test_codecs.py @@ -1250,7 +1250,7 @@ class EncodedFileTest(unittest.TestCase): self.assertEqual(ef.read(), b'\\\xd5\n\x00\x00\xae') f = io.BytesIO() - ef = codecs.EncodedFile(f, 'utf-8', 'latin1') + ef = codecs.EncodedFile(f, 'utf-8', 'latin-1') ef.write(b'\xc3\xbc') self.assertEqual(f.getvalue(), b'\xfc') @@ -1611,7 +1611,7 @@ class SurrogateEscapeTest(unittest.TestCase): def test_latin1(self): # Issue6373 - self.assertEqual("\udce4\udceb\udcef\udcf6\udcfc".encode("latin1", "surrogateescape"), + self.assertEqual("\udce4\udceb\udcef\udcf6\udcfc".encode("latin-1", "surrogateescape"), b"\xe4\xeb\xef\xf6\xfc") diff --git a/Lib/test/test_collections.py b/Lib/test/test_collections.py index b3e0907..d71fb01 100644 --- a/Lib/test/test_collections.py +++ b/Lib/test/test_collections.py @@ -10,17 +10,18 @@ from random import randrange, shuffle import keyword import re import sys -from collections import _ChainMap as ChainMap -from collections import Hashable, Iterable, Iterator -from collections import Sized, Container, Callable -from collections import Set, MutableSet -from collections import Mapping, MutableMapping, KeysView, ItemsView, UserDict -from collections import Sequence, MutableSequence -from collections import ByteString +from collections import UserDict +from collections import ChainMap +from collections.abc import Hashable, Iterable, Iterator +from collections.abc import Sized, Container, Callable +from collections.abc import Set, MutableSet +from collections.abc import Mapping, MutableMapping, KeysView, ItemsView +from collections.abc import Sequence, MutableSequence +from collections.abc import ByteString ################################################################################ -### _ChainMap (helper class for configparser) +### ChainMap (helper class for configparser and the string module) ################################################################################ class TestChainMap(unittest.TestCase): @@ -71,17 +72,23 @@ class TestChainMap(unittest.TestCase): for m1, m2 in zip(d.maps, e.maps): self.assertIsNot(m1, m2, e) - d.new_child() - d['b'] = 5 - self.assertEqual(d.maps, [{'b': 5}, {'c':30}, {'a':1, 'b':2}]) - self.assertEqual(d.parents.maps, [{'c':30}, {'a':1, 'b':2}]) # check parents - self.assertEqual(d['b'], 5) # find first in chain - self.assertEqual(d.parents['b'], 2) # look beyond maps[0] + f = d.new_child() + f['b'] = 5 + self.assertEqual(f.maps, [{'b': 5}, {'c':30}, {'a':1, 'b':2}]) + self.assertEqual(f.parents.maps, [{'c':30}, {'a':1, 'b':2}]) # check parents + self.assertEqual(f['b'], 5) # find first in chain + self.assertEqual(f.parents['b'], 2) # look beyond maps[0] def test_contructor(self): - self.assertEqual(ChainedContext().maps, [{}]) # no-args --> one new dict + self.assertEqual(ChainMap().maps, [{}]) # no-args --> one new dict self.assertEqual(ChainMap({1:2}).maps, [{1:2}]) # 1 arg --> list + def test_bool(self): + self.assertFalse(ChainMap()) + self.assertFalse(ChainMap({}, {})) + self.assertTrue(ChainMap({1:2}, {})) + self.assertTrue(ChainMap({}, {1:2})) + def test_missing(self): class DefaultChainMap(ChainMap): def __missing__(self, key): @@ -721,6 +728,44 @@ class TestCollectionABCs(ABCTestCase): self.validate_abstract_methods(MutableSequence, '__contains__', '__iter__', '__len__', '__getitem__', '__setitem__', '__delitem__', 'insert') + def test_MutableSequence_mixins(self): + # Test the mixins of MutableSequence by creating a miminal concrete + # class inherited from it. + class MutableSequenceSubclass(MutableSequence): + def __init__(self): + self.lst = [] + + def __setitem__(self, index, value): + self.lst[index] = value + + def __getitem__(self, index): + return self.lst[index] + + def __len__(self): + return len(self.lst) + + def __delitem__(self, index): + del self.lst[index] + + def insert(self, index, value): + self.lst.insert(index, value) + + mss = MutableSequenceSubclass() + mss.append(0) + mss.extend((1, 2, 3, 4)) + self.assertEqual(len(mss), 5) + self.assertEqual(mss[3], 3) + mss.reverse() + self.assertEqual(mss[3], 1) + mss.pop() + self.assertEqual(len(mss), 4) + mss.remove(3) + self.assertEqual(len(mss), 3) + mss += (10, 20, 30) + self.assertEqual(len(mss), 6) + self.assertEqual(mss[-1], 30) + mss.clear() + self.assertEqual(len(mss), 0) ################################################################################ ### Counter @@ -1181,7 +1226,7 @@ import doctest, collections def test_main(verbose=None): NamedTupleDocs = doctest.DocTestSuite(module=collections) test_classes = [TestNamedTuple, NamedTupleDocs, TestOneTrickPonyABCs, - TestCollectionABCs, TestCounter, + TestCollectionABCs, TestCounter, TestChainMap, TestOrderedDict, GeneralMappingTests, SubclassMappingTests] support.run_unittest(*test_classes) support.run_doctest(collections, verbose) diff --git a/Lib/test/test_compileall.py b/Lib/test/test_compileall.py index 250d31b..a63af4c 100644 --- a/Lib/test/test_compileall.py +++ b/Lib/test/test_compileall.py @@ -345,7 +345,7 @@ class CommandLineTests(unittest.TestCase): def test_invalid_arg_produces_message(self): out = self.assertRunOK('badfilename') - self.assertRegex(out, b"Can't list badfilename") + self.assertRegex(out, b"Can't list 'badfilename'") def test_main(): diff --git a/Lib/test/test_cfgparser.py b/Lib/test/test_configparser.py index f7d9a26..f7d9a26 100644 --- a/Lib/test/test_cfgparser.py +++ b/Lib/test/test_configparser.py diff --git a/Lib/test/test_crashers.py b/Lib/test/test_crashers.py new file mode 100644 index 0000000..336ccbe --- /dev/null +++ b/Lib/test/test_crashers.py @@ -0,0 +1,38 @@ +# Tests that the crashers in the Lib/test/crashers directory actually +# do crash the interpreter as expected +# +# If a crasher is fixed, it should be moved elsewhere in the test suite to +# ensure it continues to work correctly. + +import unittest +import glob +import os.path +import test.support +from test.script_helper import assert_python_failure + +CRASHER_DIR = os.path.join(os.path.dirname(__file__), "crashers") +CRASHER_FILES = os.path.join(CRASHER_DIR, "*.py") + +infinite_loops = ["infinite_loop_re.py", "nasty_eq_vs_dict.py"] + +class CrasherTest(unittest.TestCase): + + @unittest.skip("these tests are too fragile") + @test.support.cpython_only + def test_crashers_crash(self): + for fname in glob.glob(CRASHER_FILES): + if os.path.basename(fname) in infinite_loops: + continue + # Some "crashers" only trigger an exception rather than a + # segfault. Consider that an acceptable outcome. + if test.support.verbose: + print("Checking crasher:", fname) + assert_python_failure(fname) + + +def test_main(): + test.support.run_unittest(CrasherTest) + test.support.reap_children() + +if __name__ == "__main__": + test_main() diff --git a/Lib/test/test_crypt.py b/Lib/test/test_crypt.py index 2adb28d..dc107d8 100644 --- a/Lib/test/test_crypt.py +++ b/Lib/test/test_crypt.py @@ -10,6 +10,25 @@ class CryptTestCase(unittest.TestCase): if support.verbose: print('Test encryption: ', c) + def test_salt(self): + self.assertEqual(len(crypt._saltchars), 64) + for method in crypt.methods: + salt = crypt.mksalt(method) + self.assertEqual(len(salt), + method.salt_chars + (3 if method.ident else 0)) + + def test_saltedcrypt(self): + for method in crypt.methods: + pw = crypt.crypt('assword', method) + self.assertEqual(len(pw), method.total_size) + pw = crypt.crypt('assword', crypt.mksalt(method)) + self.assertEqual(len(pw), method.total_size) + + def test_methods(self): + # Gurantee that METHOD_CRYPT is the last method in crypt.methods. + self.assertTrue(len(crypt.methods) >= 1) + self.assertEqual(crypt.METHOD_CRYPT, crypt.methods[-1]) + def test_main(): support.run_unittest(CryptTestCase) diff --git a/Lib/test/test_descr.py b/Lib/test/test_descr.py index 41dfa70..b0d531a 100644 --- a/Lib/test/test_descr.py +++ b/Lib/test/test_descr.py @@ -4243,6 +4243,8 @@ class DictProxyTests(unittest.TestCase): pass self.C = C + @unittest.skipIf(hasattr(sys, 'gettrace') and sys.gettrace(), + 'trace function introduces __local__') def test_iter_keys(self): # Testing dict-proxy keys... it = self.C.__dict__.keys() @@ -4252,6 +4254,8 @@ class DictProxyTests(unittest.TestCase): self.assertEqual(keys, ['__dict__', '__doc__', '__module__', '__weakref__', 'meth']) + @unittest.skipIf(hasattr(sys, 'gettrace') and sys.gettrace(), + 'trace function introduces __local__') def test_iter_values(self): # Testing dict-proxy values... it = self.C.__dict__.values() @@ -4259,6 +4263,8 @@ class DictProxyTests(unittest.TestCase): values = list(it) self.assertEqual(len(values), 5) + @unittest.skipIf(hasattr(sys, 'gettrace') and sys.gettrace(), + 'trace function introduces __local__') def test_iter_items(self): # Testing dict-proxy iteritems... it = self.C.__dict__.items() diff --git a/Lib/test/test_descrtut.py b/Lib/test/test_descrtut.py index 2db3d33..c3355b9 100644 --- a/Lib/test/test_descrtut.py +++ b/Lib/test/test_descrtut.py @@ -199,6 +199,8 @@ You can get the information from the list type: '__str__', '__subclasshook__', 'append', + 'clear', + 'copy', 'count', 'extend', 'index', diff --git a/Lib/test/test_dis.py b/Lib/test/test_dis.py index 7a61493..643e2e6 100644 --- a/Lib/test/test_dis.py +++ b/Lib/test/test_dis.py @@ -1,11 +1,35 @@ # Minimal tests for dis module from test.support import run_unittest, captured_stdout +import difflib import unittest import sys import dis import io +class _C: + def __init__(self, x): + self.x = x == 1 + +dis_c_instance_method = """\ + %-4d 0 LOAD_FAST 1 (x) + 3 LOAD_CONST 1 (1) + 6 COMPARE_OP 2 (==) + 9 LOAD_FAST 0 (self) + 12 STORE_ATTR 0 (x) + 15 LOAD_CONST 0 (None) + 18 RETURN_VALUE +""" % (_C.__init__.__code__.co_firstlineno + 1,) + +dis_c_instance_method_bytes = """\ + 0 LOAD_FAST 1 (1) + 3 LOAD_CONST 1 (1) + 6 COMPARE_OP 2 (==) + 9 LOAD_FAST 0 (0) + 12 STORE_ATTR 0 (0) + 15 LOAD_CONST 0 (0) + 18 RETURN_VALUE +""" def _f(a): print(a) @@ -23,6 +47,16 @@ dis_f = """\ _f.__code__.co_firstlineno + 2) +dis_f_co_code = """\ + 0 LOAD_GLOBAL 0 (0) + 3 LOAD_FAST 0 (0) + 6 CALL_FUNCTION 1 + 9 POP_TOP + 10 LOAD_CONST 1 (1) + 13 RETURN_VALUE +""" + + def bug708901(): for res in range(1, 10): @@ -138,18 +172,27 @@ dis_compound_stmt_str = """\ """ class DisTests(unittest.TestCase): - def do_disassembly_test(self, func, expected): + + def get_disassembly(self, func, lasti=-1, wrapper=True): s = io.StringIO() save_stdout = sys.stdout sys.stdout = s - dis.dis(func) - sys.stdout = save_stdout - got = s.getvalue() + try: + if wrapper: + dis.dis(func) + else: + dis.disassemble(func, lasti) + finally: + sys.stdout = save_stdout # Trim trailing blanks (if any). - lines = got.split('\n') - lines = [line.rstrip() for line in lines] - expected = expected.split("\n") - import difflib + return [line.rstrip() for line in s.getvalue().splitlines()] + + def get_disassemble_as_string(self, func, lasti=-1): + return '\n'.join(self.get_disassembly(func, lasti, False)) + + def do_disassembly_test(self, func, expected): + lines = self.get_disassembly(func) + expected = expected.splitlines() if expected != lines: self.fail( "events did not match expectation:\n" + @@ -211,6 +254,44 @@ class DisTests(unittest.TestCase): self.do_disassembly_test(simple_stmt_str, dis_simple_stmt_str) self.do_disassembly_test(compound_stmt_str, dis_compound_stmt_str) + def test_disassemble_bytes(self): + self.do_disassembly_test(_f.__code__.co_code, dis_f_co_code) + + def test_disassemble_method(self): + self.do_disassembly_test(_C(1).__init__, dis_c_instance_method) + + def test_disassemble_method_bytes(self): + method_bytecode = _C(1).__init__.__code__.co_code + self.do_disassembly_test(method_bytecode, dis_c_instance_method_bytes) + + def test_dis_none(self): + try: + del sys.last_traceback + except AttributeError: + pass + self.assertRaises(RuntimeError, dis.dis, None) + + def test_dis_object(self): + self.assertRaises(TypeError, dis.dis, object()) + + def test_dis_traceback(self): + try: + del sys.last_traceback + except AttributeError: + pass + + try: + 1/0 + except Exception as e: + tb = e.__traceback__ + sys.last_traceback = tb + + tb_dis = self.get_disassemble_as_string(tb.tb_frame.f_code, tb.tb_lasti) + self.do_disassembly_test(None, tb_dis) + + def test_dis_object(self): + self.assertRaises(TypeError, dis.dis, object()) + code_info_code_info = """\ Name: code_info Filename: (.*) @@ -363,6 +444,13 @@ class CodeInfoTests(unittest.TestCase): dis.show_code(x) self.assertRegex(output.getvalue(), expected+"\n") + def test_code_info_object(self): + self.assertRaises(TypeError, dis.code_info, object()) + + def test_pretty_flags_no_flags(self): + self.assertEqual(dis.pretty_flags(0), '0x0') + + def test_main(): run_unittest(DisTests, CodeInfoTests) diff --git a/Lib/test/test_doctest.py b/Lib/test/test_doctest.py index 13836ba..cd87179 100644 --- a/Lib/test/test_doctest.py +++ b/Lib/test/test_doctest.py @@ -5,6 +5,7 @@ Test script for doctest. from test import support import doctest import os +import sys # NOTE: There are some additional tests relating to interaction with @@ -373,7 +374,7 @@ We'll simulate a __file__ attr that ends in pyc: >>> tests = finder.find(sample_func) >>> print(tests) # doctest: +ELLIPSIS - [<DocTest sample_func from ...:17 (1 example)>] + [<DocTest sample_func from ...:18 (1 example)>] The exact name depends on how test_doctest was invoked, so allow for leading path components. @@ -1686,226 +1687,227 @@ Run the debugger on the docstring, and then restore sys.stdin. """ -def test_pdb_set_trace(): - """Using pdb.set_trace from a doctest. - - You can use pdb.set_trace from a doctest. To do so, you must - retrieve the set_trace function from the pdb module at the time - you use it. The doctest module changes sys.stdout so that it can - capture program output. It also temporarily replaces pdb.set_trace - with a version that restores stdout. This is necessary for you to - see debugger output. - - >>> doc = ''' - ... >>> x = 42 - ... >>> raise Exception('clé') - ... Traceback (most recent call last): - ... Exception: clé - ... >>> import pdb; pdb.set_trace() - ... ''' - >>> parser = doctest.DocTestParser() - >>> test = parser.get_doctest(doc, {}, "foo-bar@baz", "foo-bar@baz.py", 0) - >>> runner = doctest.DocTestRunner(verbose=False) - - To demonstrate this, we'll create a fake standard input that - captures our debugger input: - - >>> import tempfile - >>> real_stdin = sys.stdin - >>> sys.stdin = _FakeInput([ - ... 'print(x)', # print data defined by the example - ... 'continue', # stop debugging - ... '']) - - >>> try: runner.run(test) - ... finally: sys.stdin = real_stdin - --Return-- - > <doctest foo-bar@baz[2]>(1)<module>()->None - -> import pdb; pdb.set_trace() - (Pdb) print(x) - 42 - (Pdb) continue - TestResults(failed=0, attempted=3) - - You can also put pdb.set_trace in a function called from a test: - - >>> def calls_set_trace(): - ... y=2 - ... import pdb; pdb.set_trace() - - >>> doc = ''' - ... >>> x=1 - ... >>> calls_set_trace() - ... ''' - >>> test = parser.get_doctest(doc, globals(), "foo-bar@baz", "foo-bar@baz.py", 0) - >>> real_stdin = sys.stdin - >>> sys.stdin = _FakeInput([ - ... 'print(y)', # print data defined in the function - ... 'up', # out of function - ... 'print(x)', # print data defined by the example - ... 'continue', # stop debugging - ... '']) - - >>> try: - ... runner.run(test) - ... finally: - ... sys.stdin = real_stdin - --Return-- - > <doctest test.test_doctest.test_pdb_set_trace[8]>(3)calls_set_trace()->None - -> import pdb; pdb.set_trace() - (Pdb) print(y) - 2 - (Pdb) up - > <doctest foo-bar@baz[1]>(1)<module>() - -> calls_set_trace() - (Pdb) print(x) - 1 - (Pdb) continue - TestResults(failed=0, attempted=2) - - During interactive debugging, source code is shown, even for - doctest examples: - - >>> doc = ''' - ... >>> def f(x): - ... ... g(x*2) - ... >>> def g(x): - ... ... print(x+3) - ... ... import pdb; pdb.set_trace() - ... >>> f(3) - ... ''' - >>> test = parser.get_doctest(doc, globals(), "foo-bar@baz", "foo-bar@baz.py", 0) - >>> real_stdin = sys.stdin - >>> sys.stdin = _FakeInput([ - ... 'list', # list source from example 2 - ... 'next', # return from g() - ... 'list', # list source from example 1 - ... 'next', # return from f() - ... 'list', # list source from example 3 - ... 'continue', # stop debugging - ... '']) - >>> try: runner.run(test) - ... finally: sys.stdin = real_stdin - ... # doctest: +NORMALIZE_WHITESPACE - --Return-- - > <doctest foo-bar@baz[1]>(3)g()->None - -> import pdb; pdb.set_trace() - (Pdb) list - 1 def g(x): - 2 print(x+3) - 3 -> import pdb; pdb.set_trace() - [EOF] - (Pdb) next - --Return-- - > <doctest foo-bar@baz[0]>(2)f()->None - -> g(x*2) - (Pdb) list - 1 def f(x): - 2 -> g(x*2) - [EOF] - (Pdb) next - --Return-- - > <doctest foo-bar@baz[2]>(1)<module>()->None - -> f(3) - (Pdb) list - 1 -> f(3) - [EOF] - (Pdb) continue - ********************************************************************** - File "foo-bar@baz.py", line 7, in foo-bar@baz - Failed example: - f(3) - Expected nothing - Got: - 9 - TestResults(failed=1, attempted=3) - """ - -def test_pdb_set_trace_nested(): - """This illustrates more-demanding use of set_trace with nested functions. - - >>> class C(object): - ... def calls_set_trace(self): - ... y = 1 - ... import pdb; pdb.set_trace() - ... self.f1() - ... y = 2 - ... def f1(self): - ... x = 1 - ... self.f2() - ... x = 2 - ... def f2(self): - ... z = 1 - ... z = 2 - - >>> calls_set_trace = C().calls_set_trace - - >>> doc = ''' - ... >>> a = 1 - ... >>> calls_set_trace() - ... ''' - >>> parser = doctest.DocTestParser() - >>> runner = doctest.DocTestRunner(verbose=False) - >>> test = parser.get_doctest(doc, globals(), "foo-bar@baz", "foo-bar@baz.py", 0) - >>> real_stdin = sys.stdin - >>> sys.stdin = _FakeInput([ - ... 'print(y)', # print data defined in the function - ... 'step', 'step', 'step', 'step', 'step', 'step', 'print(z)', - ... 'up', 'print(x)', - ... 'up', 'print(y)', - ... 'up', 'print(foo)', - ... 'continue', # stop debugging - ... '']) - - >>> try: - ... runner.run(test) - ... finally: - ... sys.stdin = real_stdin - ... # doctest: +REPORT_NDIFF - > <doctest test.test_doctest.test_pdb_set_trace_nested[0]>(5)calls_set_trace() - -> self.f1() - (Pdb) print(y) - 1 - (Pdb) step - --Call-- - > <doctest test.test_doctest.test_pdb_set_trace_nested[0]>(7)f1() - -> def f1(self): - (Pdb) step - > <doctest test.test_doctest.test_pdb_set_trace_nested[0]>(8)f1() - -> x = 1 - (Pdb) step - > <doctest test.test_doctest.test_pdb_set_trace_nested[0]>(9)f1() - -> self.f2() - (Pdb) step - --Call-- - > <doctest test.test_doctest.test_pdb_set_trace_nested[0]>(11)f2() - -> def f2(self): - (Pdb) step - > <doctest test.test_doctest.test_pdb_set_trace_nested[0]>(12)f2() - -> z = 1 - (Pdb) step - > <doctest test.test_doctest.test_pdb_set_trace_nested[0]>(13)f2() - -> z = 2 - (Pdb) print(z) - 1 - (Pdb) up - > <doctest test.test_doctest.test_pdb_set_trace_nested[0]>(9)f1() - -> self.f2() - (Pdb) print(x) - 1 - (Pdb) up - > <doctest test.test_doctest.test_pdb_set_trace_nested[0]>(5)calls_set_trace() - -> self.f1() - (Pdb) print(y) - 1 - (Pdb) up - > <doctest foo-bar@baz[1]>(1)<module>() - -> calls_set_trace() - (Pdb) print(foo) - *** NameError: name 'foo' is not defined - (Pdb) continue - TestResults(failed=0, attempted=2) -""" +if not hasattr(sys, 'gettrace') or not sys.gettrace(): + def test_pdb_set_trace(): + """Using pdb.set_trace from a doctest. + + You can use pdb.set_trace from a doctest. To do so, you must + retrieve the set_trace function from the pdb module at the time + you use it. The doctest module changes sys.stdout so that it can + capture program output. It also temporarily replaces pdb.set_trace + with a version that restores stdout. This is necessary for you to + see debugger output. + + >>> doc = ''' + ... >>> x = 42 + ... >>> raise Exception('clé') + ... Traceback (most recent call last): + ... Exception: clé + ... >>> import pdb; pdb.set_trace() + ... ''' + >>> parser = doctest.DocTestParser() + >>> test = parser.get_doctest(doc, {}, "foo-bar@baz", "foo-bar@baz.py", 0) + >>> runner = doctest.DocTestRunner(verbose=False) + + To demonstrate this, we'll create a fake standard input that + captures our debugger input: + + >>> import tempfile + >>> real_stdin = sys.stdin + >>> sys.stdin = _FakeInput([ + ... 'print(x)', # print data defined by the example + ... 'continue', # stop debugging + ... '']) + + >>> try: runner.run(test) + ... finally: sys.stdin = real_stdin + --Return-- + > <doctest foo-bar@baz[2]>(1)<module>()->None + -> import pdb; pdb.set_trace() + (Pdb) print(x) + 42 + (Pdb) continue + TestResults(failed=0, attempted=3) + + You can also put pdb.set_trace in a function called from a test: + + >>> def calls_set_trace(): + ... y=2 + ... import pdb; pdb.set_trace() + + >>> doc = ''' + ... >>> x=1 + ... >>> calls_set_trace() + ... ''' + >>> test = parser.get_doctest(doc, globals(), "foo-bar@baz", "foo-bar@baz.py", 0) + >>> real_stdin = sys.stdin + >>> sys.stdin = _FakeInput([ + ... 'print(y)', # print data defined in the function + ... 'up', # out of function + ... 'print(x)', # print data defined by the example + ... 'continue', # stop debugging + ... '']) + + >>> try: + ... runner.run(test) + ... finally: + ... sys.stdin = real_stdin + --Return-- + > <doctest test.test_doctest.test_pdb_set_trace[8]>(3)calls_set_trace()->None + -> import pdb; pdb.set_trace() + (Pdb) print(y) + 2 + (Pdb) up + > <doctest foo-bar@baz[1]>(1)<module>() + -> calls_set_trace() + (Pdb) print(x) + 1 + (Pdb) continue + TestResults(failed=0, attempted=2) + + During interactive debugging, source code is shown, even for + doctest examples: + + >>> doc = ''' + ... >>> def f(x): + ... ... g(x*2) + ... >>> def g(x): + ... ... print(x+3) + ... ... import pdb; pdb.set_trace() + ... >>> f(3) + ... ''' + >>> test = parser.get_doctest(doc, globals(), "foo-bar@baz", "foo-bar@baz.py", 0) + >>> real_stdin = sys.stdin + >>> sys.stdin = _FakeInput([ + ... 'list', # list source from example 2 + ... 'next', # return from g() + ... 'list', # list source from example 1 + ... 'next', # return from f() + ... 'list', # list source from example 3 + ... 'continue', # stop debugging + ... '']) + >>> try: runner.run(test) + ... finally: sys.stdin = real_stdin + ... # doctest: +NORMALIZE_WHITESPACE + --Return-- + > <doctest foo-bar@baz[1]>(3)g()->None + -> import pdb; pdb.set_trace() + (Pdb) list + 1 def g(x): + 2 print(x+3) + 3 -> import pdb; pdb.set_trace() + [EOF] + (Pdb) next + --Return-- + > <doctest foo-bar@baz[0]>(2)f()->None + -> g(x*2) + (Pdb) list + 1 def f(x): + 2 -> g(x*2) + [EOF] + (Pdb) next + --Return-- + > <doctest foo-bar@baz[2]>(1)<module>()->None + -> f(3) + (Pdb) list + 1 -> f(3) + [EOF] + (Pdb) continue + ********************************************************************** + File "foo-bar@baz.py", line 7, in foo-bar@baz + Failed example: + f(3) + Expected nothing + Got: + 9 + TestResults(failed=1, attempted=3) + """ + + def test_pdb_set_trace_nested(): + """This illustrates more-demanding use of set_trace with nested functions. + + >>> class C(object): + ... def calls_set_trace(self): + ... y = 1 + ... import pdb; pdb.set_trace() + ... self.f1() + ... y = 2 + ... def f1(self): + ... x = 1 + ... self.f2() + ... x = 2 + ... def f2(self): + ... z = 1 + ... z = 2 + + >>> calls_set_trace = C().calls_set_trace + + >>> doc = ''' + ... >>> a = 1 + ... >>> calls_set_trace() + ... ''' + >>> parser = doctest.DocTestParser() + >>> runner = doctest.DocTestRunner(verbose=False) + >>> test = parser.get_doctest(doc, globals(), "foo-bar@baz", "foo-bar@baz.py", 0) + >>> real_stdin = sys.stdin + >>> sys.stdin = _FakeInput([ + ... 'print(y)', # print data defined in the function + ... 'step', 'step', 'step', 'step', 'step', 'step', 'print(z)', + ... 'up', 'print(x)', + ... 'up', 'print(y)', + ... 'up', 'print(foo)', + ... 'continue', # stop debugging + ... '']) + + >>> try: + ... runner.run(test) + ... finally: + ... sys.stdin = real_stdin + ... # doctest: +REPORT_NDIFF + > <doctest test.test_doctest.test_pdb_set_trace_nested[0]>(5)calls_set_trace() + -> self.f1() + (Pdb) print(y) + 1 + (Pdb) step + --Call-- + > <doctest test.test_doctest.test_pdb_set_trace_nested[0]>(7)f1() + -> def f1(self): + (Pdb) step + > <doctest test.test_doctest.test_pdb_set_trace_nested[0]>(8)f1() + -> x = 1 + (Pdb) step + > <doctest test.test_doctest.test_pdb_set_trace_nested[0]>(9)f1() + -> self.f2() + (Pdb) step + --Call-- + > <doctest test.test_doctest.test_pdb_set_trace_nested[0]>(11)f2() + -> def f2(self): + (Pdb) step + > <doctest test.test_doctest.test_pdb_set_trace_nested[0]>(12)f2() + -> z = 1 + (Pdb) step + > <doctest test.test_doctest.test_pdb_set_trace_nested[0]>(13)f2() + -> z = 2 + (Pdb) print(z) + 1 + (Pdb) up + > <doctest test.test_doctest.test_pdb_set_trace_nested[0]>(9)f1() + -> self.f2() + (Pdb) print(x) + 1 + (Pdb) up + > <doctest test.test_doctest.test_pdb_set_trace_nested[0]>(5)calls_set_trace() + -> self.f1() + (Pdb) print(y) + 1 + (Pdb) up + > <doctest foo-bar@baz[1]>(1)<module>() + -> calls_set_trace() + (Pdb) print(foo) + *** NameError: name 'foo' is not defined + (Pdb) continue + TestResults(failed=0, attempted=2) + """ def test_DocTestSuite(): """DocTestSuite creates a unittest test suite from a doctest. diff --git a/Lib/test/test_dummy_thread.py b/Lib/test/test_dummy_thread.py index c61078d..2fafe1d 100644 --- a/Lib/test/test_dummy_thread.py +++ b/Lib/test/test_dummy_thread.py @@ -35,8 +35,8 @@ class LockTests(unittest.TestCase): "Lock object did not release properly.") def test_improper_release(self): - #Make sure release of an unlocked thread raises _thread.error - self.assertRaises(_thread.error, self.lock.release) + #Make sure release of an unlocked thread raises RuntimeError + self.assertRaises(RuntimeError, self.lock.release) def test_cond_acquire_success(self): #Make sure the conditional acquiring of the lock works. diff --git a/Lib/test/test_email.py b/Lib/test/test_email.py deleted file mode 100644 index 5eebba5..0000000 --- a/Lib/test/test_email.py +++ /dev/null @@ -1,14 +0,0 @@ -# Copyright (C) 2001-2007 Python Software Foundation -# email package unit tests - -# The specific tests now live in Lib/email/test -from email.test.test_email import suite -from email.test.test_email_codecs import suite as codecs_suite -from test import support - -def test_main(): - support.run_unittest(suite()) - support.run_unittest(codecs_suite()) - -if __name__ == '__main__': - test_main() diff --git a/Lib/test/test_email/__init__.py b/Lib/test/test_email/__init__.py new file mode 100644 index 0000000..5aae093 --- /dev/null +++ b/Lib/test/test_email/__init__.py @@ -0,0 +1,13 @@ +import os +import sys +import unittest +import test.support + +# used by regrtest and __main__. +def test_main(): + here = os.path.dirname(__file__) + # Unittest mucks with the path, so we have to save and restore + # it to keep regrtest happy. + savepath = sys.path[:] + test.support._run_suite(unittest.defaultTestLoader.discover(here)) + sys.path[:] = savepath diff --git a/Lib/email/test/data/PyBanner048.gif b/Lib/test/test_email/data/PyBanner048.gif Binary files differindex 1a5c87f..1a5c87f 100644 --- a/Lib/email/test/data/PyBanner048.gif +++ b/Lib/test/test_email/data/PyBanner048.gif diff --git a/Lib/email/test/data/audiotest.au b/Lib/test/test_email/data/audiotest.au Binary files differindex f76b050..f76b050 100644 --- a/Lib/email/test/data/audiotest.au +++ b/Lib/test/test_email/data/audiotest.au diff --git a/Lib/email/test/data/msg_01.txt b/Lib/test/test_email/data/msg_01.txt index 7e33bcf..7e33bcf 100644 --- a/Lib/email/test/data/msg_01.txt +++ b/Lib/test/test_email/data/msg_01.txt diff --git a/Lib/email/test/data/msg_02.txt b/Lib/test/test_email/data/msg_02.txt index 43f2480..43f2480 100644 --- a/Lib/email/test/data/msg_02.txt +++ b/Lib/test/test_email/data/msg_02.txt diff --git a/Lib/email/test/data/msg_03.txt b/Lib/test/test_email/data/msg_03.txt index c748ebf..c748ebf 100644 --- a/Lib/email/test/data/msg_03.txt +++ b/Lib/test/test_email/data/msg_03.txt diff --git a/Lib/email/test/data/msg_04.txt b/Lib/test/test_email/data/msg_04.txt index 1f633c4..1f633c4 100644 --- a/Lib/email/test/data/msg_04.txt +++ b/Lib/test/test_email/data/msg_04.txt diff --git a/Lib/email/test/data/msg_05.txt b/Lib/test/test_email/data/msg_05.txt index 87d5e9c..87d5e9c 100644 --- a/Lib/email/test/data/msg_05.txt +++ b/Lib/test/test_email/data/msg_05.txt diff --git a/Lib/email/test/data/msg_06.txt b/Lib/test/test_email/data/msg_06.txt index 69f3a47..69f3a47 100644 --- a/Lib/email/test/data/msg_06.txt +++ b/Lib/test/test_email/data/msg_06.txt diff --git a/Lib/email/test/data/msg_07.txt b/Lib/test/test_email/data/msg_07.txt index 721f3a0..721f3a0 100644 --- a/Lib/email/test/data/msg_07.txt +++ b/Lib/test/test_email/data/msg_07.txt diff --git a/Lib/email/test/data/msg_08.txt b/Lib/test/test_email/data/msg_08.txt index b563083..b563083 100644 --- a/Lib/email/test/data/msg_08.txt +++ b/Lib/test/test_email/data/msg_08.txt diff --git a/Lib/email/test/data/msg_09.txt b/Lib/test/test_email/data/msg_09.txt index 575c4c2..575c4c2 100644 --- a/Lib/email/test/data/msg_09.txt +++ b/Lib/test/test_email/data/msg_09.txt diff --git a/Lib/email/test/data/msg_10.txt b/Lib/test/test_email/data/msg_10.txt index 0790396..0790396 100644 --- a/Lib/email/test/data/msg_10.txt +++ b/Lib/test/test_email/data/msg_10.txt diff --git a/Lib/email/test/data/msg_11.txt b/Lib/test/test_email/data/msg_11.txt index 8f7f199..8f7f199 100644 --- a/Lib/email/test/data/msg_11.txt +++ b/Lib/test/test_email/data/msg_11.txt diff --git a/Lib/email/test/data/msg_12.txt b/Lib/test/test_email/data/msg_12.txt index 4bec8d9..4bec8d9 100644 --- a/Lib/email/test/data/msg_12.txt +++ b/Lib/test/test_email/data/msg_12.txt diff --git a/Lib/email/test/data/msg_12a.txt b/Lib/test/test_email/data/msg_12a.txt index e94224e..e94224e 100644 --- a/Lib/email/test/data/msg_12a.txt +++ b/Lib/test/test_email/data/msg_12a.txt diff --git a/Lib/email/test/data/msg_13.txt b/Lib/test/test_email/data/msg_13.txt index 8e6d52d..8e6d52d 100644 --- a/Lib/email/test/data/msg_13.txt +++ b/Lib/test/test_email/data/msg_13.txt diff --git a/Lib/email/test/data/msg_14.txt b/Lib/test/test_email/data/msg_14.txt index 5d98d2f..5d98d2f 100644 --- a/Lib/email/test/data/msg_14.txt +++ b/Lib/test/test_email/data/msg_14.txt diff --git a/Lib/email/test/data/msg_15.txt b/Lib/test/test_email/data/msg_15.txt index 0025624..0025624 100644 --- a/Lib/email/test/data/msg_15.txt +++ b/Lib/test/test_email/data/msg_15.txt diff --git a/Lib/email/test/data/msg_16.txt b/Lib/test/test_email/data/msg_16.txt index 56167e9..56167e9 100644 --- a/Lib/email/test/data/msg_16.txt +++ b/Lib/test/test_email/data/msg_16.txt diff --git a/Lib/email/test/data/msg_17.txt b/Lib/test/test_email/data/msg_17.txt index 8d86e41..8d86e41 100644 --- a/Lib/email/test/data/msg_17.txt +++ b/Lib/test/test_email/data/msg_17.txt diff --git a/Lib/email/test/data/msg_18.txt b/Lib/test/test_email/data/msg_18.txt index f9f4904..f9f4904 100644 --- a/Lib/email/test/data/msg_18.txt +++ b/Lib/test/test_email/data/msg_18.txt diff --git a/Lib/email/test/data/msg_19.txt b/Lib/test/test_email/data/msg_19.txt index 49bf7fc..49bf7fc 100644 --- a/Lib/email/test/data/msg_19.txt +++ b/Lib/test/test_email/data/msg_19.txt diff --git a/Lib/email/test/data/msg_20.txt b/Lib/test/test_email/data/msg_20.txt index 1a6a887..1a6a887 100644 --- a/Lib/email/test/data/msg_20.txt +++ b/Lib/test/test_email/data/msg_20.txt diff --git a/Lib/email/test/data/msg_21.txt b/Lib/test/test_email/data/msg_21.txt index 23590b2..23590b2 100644 --- a/Lib/email/test/data/msg_21.txt +++ b/Lib/test/test_email/data/msg_21.txt diff --git a/Lib/email/test/data/msg_22.txt b/Lib/test/test_email/data/msg_22.txt index af9de5f..af9de5f 100644 --- a/Lib/email/test/data/msg_22.txt +++ b/Lib/test/test_email/data/msg_22.txt diff --git a/Lib/email/test/data/msg_23.txt b/Lib/test/test_email/data/msg_23.txt index bb2e8ec..bb2e8ec 100644 --- a/Lib/email/test/data/msg_23.txt +++ b/Lib/test/test_email/data/msg_23.txt diff --git a/Lib/email/test/data/msg_24.txt b/Lib/test/test_email/data/msg_24.txt index 4e52339..4e52339 100644 --- a/Lib/email/test/data/msg_24.txt +++ b/Lib/test/test_email/data/msg_24.txt diff --git a/Lib/email/test/data/msg_25.txt b/Lib/test/test_email/data/msg_25.txt index 9e35275..9e35275 100644 --- a/Lib/email/test/data/msg_25.txt +++ b/Lib/test/test_email/data/msg_25.txt diff --git a/Lib/email/test/data/msg_26.txt b/Lib/test/test_email/data/msg_26.txt index 58efaa9..58efaa9 100644 --- a/Lib/email/test/data/msg_26.txt +++ b/Lib/test/test_email/data/msg_26.txt diff --git a/Lib/email/test/data/msg_27.txt b/Lib/test/test_email/data/msg_27.txt index d019176..d019176 100644 --- a/Lib/email/test/data/msg_27.txt +++ b/Lib/test/test_email/data/msg_27.txt diff --git a/Lib/email/test/data/msg_28.txt b/Lib/test/test_email/data/msg_28.txt index 1e4824c..1e4824c 100644 --- a/Lib/email/test/data/msg_28.txt +++ b/Lib/test/test_email/data/msg_28.txt diff --git a/Lib/email/test/data/msg_29.txt b/Lib/test/test_email/data/msg_29.txt index 1fab561..1fab561 100644 --- a/Lib/email/test/data/msg_29.txt +++ b/Lib/test/test_email/data/msg_29.txt diff --git a/Lib/email/test/data/msg_30.txt b/Lib/test/test_email/data/msg_30.txt index 4334bb6..4334bb6 100644 --- a/Lib/email/test/data/msg_30.txt +++ b/Lib/test/test_email/data/msg_30.txt diff --git a/Lib/email/test/data/msg_31.txt b/Lib/test/test_email/data/msg_31.txt index 1e58e56..1e58e56 100644 --- a/Lib/email/test/data/msg_31.txt +++ b/Lib/test/test_email/data/msg_31.txt diff --git a/Lib/email/test/data/msg_32.txt b/Lib/test/test_email/data/msg_32.txt index 07ec5af..07ec5af 100644 --- a/Lib/email/test/data/msg_32.txt +++ b/Lib/test/test_email/data/msg_32.txt diff --git a/Lib/email/test/data/msg_33.txt b/Lib/test/test_email/data/msg_33.txt index 042787a..042787a 100644 --- a/Lib/email/test/data/msg_33.txt +++ b/Lib/test/test_email/data/msg_33.txt diff --git a/Lib/email/test/data/msg_34.txt b/Lib/test/test_email/data/msg_34.txt index 055dfea..055dfea 100644 --- a/Lib/email/test/data/msg_34.txt +++ b/Lib/test/test_email/data/msg_34.txt diff --git a/Lib/email/test/data/msg_35.txt b/Lib/test/test_email/data/msg_35.txt index be7d5a2..be7d5a2 100644 --- a/Lib/email/test/data/msg_35.txt +++ b/Lib/test/test_email/data/msg_35.txt diff --git a/Lib/email/test/data/msg_36.txt b/Lib/test/test_email/data/msg_36.txt index 5632c30..5632c30 100644 --- a/Lib/email/test/data/msg_36.txt +++ b/Lib/test/test_email/data/msg_36.txt diff --git a/Lib/email/test/data/msg_37.txt b/Lib/test/test_email/data/msg_37.txt index 038d34a..038d34a 100644 --- a/Lib/email/test/data/msg_37.txt +++ b/Lib/test/test_email/data/msg_37.txt diff --git a/Lib/email/test/data/msg_38.txt b/Lib/test/test_email/data/msg_38.txt index 006df81..006df81 100644 --- a/Lib/email/test/data/msg_38.txt +++ b/Lib/test/test_email/data/msg_38.txt diff --git a/Lib/email/test/data/msg_39.txt b/Lib/test/test_email/data/msg_39.txt index 124b269..124b269 100644 --- a/Lib/email/test/data/msg_39.txt +++ b/Lib/test/test_email/data/msg_39.txt diff --git a/Lib/email/test/data/msg_40.txt b/Lib/test/test_email/data/msg_40.txt index 1435fa1..1435fa1 100644 --- a/Lib/email/test/data/msg_40.txt +++ b/Lib/test/test_email/data/msg_40.txt diff --git a/Lib/email/test/data/msg_41.txt b/Lib/test/test_email/data/msg_41.txt index 76cdd1c..76cdd1c 100644 --- a/Lib/email/test/data/msg_41.txt +++ b/Lib/test/test_email/data/msg_41.txt diff --git a/Lib/email/test/data/msg_42.txt b/Lib/test/test_email/data/msg_42.txt index a75f8f4..a75f8f4 100644 --- a/Lib/email/test/data/msg_42.txt +++ b/Lib/test/test_email/data/msg_42.txt diff --git a/Lib/email/test/data/msg_43.txt b/Lib/test/test_email/data/msg_43.txt index 797d12c..797d12c 100644 --- a/Lib/email/test/data/msg_43.txt +++ b/Lib/test/test_email/data/msg_43.txt diff --git a/Lib/email/test/data/msg_44.txt b/Lib/test/test_email/data/msg_44.txt index 15a2252..15a2252 100644 --- a/Lib/email/test/data/msg_44.txt +++ b/Lib/test/test_email/data/msg_44.txt diff --git a/Lib/email/test/data/msg_45.txt b/Lib/test/test_email/data/msg_45.txt index 58fde95..58fde95 100644 --- a/Lib/email/test/data/msg_45.txt +++ b/Lib/test/test_email/data/msg_45.txt diff --git a/Lib/email/test/data/msg_46.txt b/Lib/test/test_email/data/msg_46.txt index 1e22c4f..1e22c4f 100644 --- a/Lib/email/test/data/msg_46.txt +++ b/Lib/test/test_email/data/msg_46.txt diff --git a/Lib/email/test/test_email_codecs.py b/Lib/test/test_email/test_asian_codecs.py index ca85f57..a4dd9a9 100644 --- a/Lib/email/test/test_email_codecs.py +++ b/Lib/test/test_email/test_asian_codecs.py @@ -5,7 +5,7 @@ import unittest from test.support import run_unittest -from email.test.test_email import TestEmailBase +from test.test_email.test_email import TestEmailBase from email.charset import Charset from email.header import Header, decode_header from email.message import Message @@ -78,16 +78,5 @@ Hello World! =?iso-2022-jp?b?GyRCJU8lbSE8JW8hPCVrJUkhKhsoQg==?= -def suite(): - suite = unittest.TestSuite() - suite.addTest(unittest.makeSuite(TestEmailAsianCodecs)) - return suite - - -def test_main(): - run_unittest(TestEmailAsianCodecs) - - - if __name__ == '__main__': - unittest.main(defaultTest='suite') + unittest.main() diff --git a/Lib/email/test/test_email.py b/Lib/test/test_email/test_email.py index 2ff2a8f..bc6a309 100644 --- a/Lib/email/test/test_email.py +++ b/Lib/test/test_email/test_email.py @@ -37,7 +37,7 @@ from email import base64mime from email import quoprimime from test.support import findfile, run_unittest, unlink -from email.test import __file__ as landmark +from test.test_email import __file__ as landmark NL = '\n' @@ -2352,6 +2352,13 @@ class TestMiscellaneous(TestEmailBase): (2002, 4, 3, 14, 58, 26, 0, 1, -1, -28800)) + def test_parsedate_accepts_time_with_dots(self): + eq = self.assertEqual + eq(utils.parsedate_tz('5 Feb 2003 13.47.26 -0800'), + (2003, 2, 5, 13, 47, 26, 0, 1, -1, -28800)) + eq(utils.parsedate_tz('5 Feb 2003 13.47 -0800'), + (2003, 2, 5, 13, 47, 0, 0, 1, -1, -28800)) + def test_parsedate_acceptable_to_time_functions(self): eq = self.assertEqual timetup = utils.parsedate('5 Feb 2003 13:47:26 -0800') @@ -4314,23 +4321,5 @@ class TestSigned(TestEmailBase): -def _testclasses(): - mod = sys.modules[__name__] - return [getattr(mod, name) for name in dir(mod) if name.startswith('Test')] - - -def suite(): - suite = unittest.TestSuite() - for testclass in _testclasses(): - suite.addTest(unittest.makeSuite(testclass)) - return suite - - -def test_main(): - for testclass in _testclasses(): - run_unittest(testclass) - - - if __name__ == '__main__': - unittest.main(defaultTest='suite') + unittest.main() diff --git a/Lib/email/test/test_email_torture.py b/Lib/test/test_email/torture_test.py index 544b1bb..544b1bb 100644 --- a/Lib/email/test/test_email_torture.py +++ b/Lib/test/test_email/torture_test.py diff --git a/Lib/test/test_exceptions.py b/Lib/test/test_exceptions.py index 76f4249..592c765 100644 --- a/Lib/test/test_exceptions.py +++ b/Lib/test/test_exceptions.py @@ -7,7 +7,7 @@ import pickle import weakref from test.support import (TESTFN, unlink, run_unittest, captured_output, - gc_collect, cpython_only) + gc_collect, cpython_only, no_tracing) # XXX This is not really enough, each *operation* should be tested! @@ -388,6 +388,7 @@ class ExceptionTests(unittest.TestCase): x = DerivedException(fancy_arg=42) self.assertEqual(x.fancy_arg, 42) + @no_tracing def testInfiniteRecursion(self): def f(): return f() @@ -631,6 +632,7 @@ class ExceptionTests(unittest.TestCase): u.start = 1000 self.assertEqual(str(u), "can't translate characters in position 1000-4: 965230951443685724997") + @no_tracing def test_badisinstance(self): # Bug #2542: if issubclass(e, MyException) raises an exception, # it should be ignored @@ -741,6 +743,7 @@ class ExceptionTests(unittest.TestCase): self.fail("MemoryError not raised") self.assertEqual(wr(), None) + @no_tracing def test_recursion_error_cleanup(self): # Same test as above, but with "recursion exceeded" errors class C: diff --git a/Lib/test/test_fileinput.py b/Lib/test/test_fileinput.py index f312882..76e4d16 100644 --- a/Lib/test/test_fileinput.py +++ b/Lib/test/test_fileinput.py @@ -2,18 +2,31 @@ Tests for fileinput module. Nick Mathewson ''' - +import os +import sys +import re +import fileinput +import collections +import gzip +import types +import codecs import unittest -from test.support import verbose, TESTFN, run_unittest -from test.support import unlink as safe_unlink -import sys, re + +try: + import bz2 +except ImportError: + bz2 = None + from io import StringIO from fileinput import FileInput, hook_encoded +from test.support import verbose, TESTFN, run_unittest +from test.support import unlink as safe_unlink + + # The fileinput module has 2 interfaces: the FileInput class which does # all the work, and a few functions (input, etc.) that use a global _state -# variable. We only test the FileInput class, since the other functions -# only provide a thin facade over FileInput. +# variable. # Write lines (a list of lines) to temp file number i, and return the # temp file's name. @@ -121,7 +134,16 @@ class BufferSizesTests(unittest.TestCase): self.assertEqual(int(m.group(1)), fi.filelineno()) fi.close() +class UnconditionallyRaise: + def __init__(self, exception_type): + self.exception_type = exception_type + self.invoked = False + def __call__(self, *args, **kwargs): + self.invoked = True + raise self.exception_type() + class FileInputTests(unittest.TestCase): + def test_zero_byte_files(self): t1 = t2 = t3 = t4 = None try: @@ -219,17 +241,20 @@ class FileInputTests(unittest.TestCase): self.fail("FileInput should check openhook for being callable") except ValueError: pass - # XXX The rot13 codec was removed. - # So this test needs to be changed to use something else. - # (Or perhaps the API needs to change so we can just pass - # an encoding rather than using a hook?) -## try: -## t1 = writeTmp(1, ["A\nB"], mode="wb") -## fi = FileInput(files=t1, openhook=hook_encoded("rot13")) -## lines = list(fi) -## self.assertEqual(lines, ["N\n", "O"]) -## finally: -## remove_tempfiles(t1) + + class CustomOpenHook: + def __init__(self): + self.invoked = False + def __call__(self, *args): + self.invoked = True + return open(*args) + + t = writeTmp(1, ["\n"]) + self.addCleanup(remove_tempfiles, t) + custom_open_hook = CustomOpenHook() + with FileInput([t], openhook=custom_open_hook) as fi: + fi.readline() + self.assertTrue(custom_open_hook.invoked, "openhook not invoked") def test_context_manager(self): try: @@ -254,9 +279,584 @@ class FileInputTests(unittest.TestCase): finally: remove_tempfiles(t1) + def test_empty_files_list_specified_to_constructor(self): + with FileInput(files=[]) as fi: + self.assertEqual(fi._files, ('-',)) + + def test__getitem__(self): + """Tests invoking FileInput.__getitem__() with the current + line number""" + t = writeTmp(1, ["line1\n", "line2\n"]) + self.addCleanup(remove_tempfiles, t) + with FileInput(files=[t]) as fi: + retval1 = fi[0] + self.assertEqual(retval1, "line1\n") + retval2 = fi[1] + self.assertEqual(retval2, "line2\n") + + def test__getitem__invalid_key(self): + """Tests invoking FileInput.__getitem__() with an index unequal to + the line number""" + t = writeTmp(1, ["line1\n", "line2\n"]) + self.addCleanup(remove_tempfiles, t) + with FileInput(files=[t]) as fi: + with self.assertRaises(RuntimeError) as cm: + fi[1] + self.assertEqual(cm.exception.args, ("accessing lines out of order",)) + + def test__getitem__eof(self): + """Tests invoking FileInput.__getitem__() with the line number but at + end-of-input""" + t = writeTmp(1, []) + self.addCleanup(remove_tempfiles, t) + with FileInput(files=[t]) as fi: + with self.assertRaises(IndexError) as cm: + fi[0] + self.assertEqual(cm.exception.args, ("end of input reached",)) + + def test_nextfile_oserror_deleting_backup(self): + """Tests invoking FileInput.nextfile() when the attempt to delete + the backup file would raise OSError. This error is expected to be + silently ignored""" + + os_unlink_orig = os.unlink + os_unlink_replacement = UnconditionallyRaise(OSError) + try: + t = writeTmp(1, ["\n"]) + self.addCleanup(remove_tempfiles, t) + with FileInput(files=[t], inplace=True) as fi: + next(fi) # make sure the file is opened + os.unlink = os_unlink_replacement + fi.nextfile() + finally: + os.unlink = os_unlink_orig + + # sanity check to make sure that our test scenario was actually hit + self.assertTrue(os_unlink_replacement.invoked, + "os.unlink() was not invoked") + + def test_readline_os_fstat_raises_OSError(self): + """Tests invoking FileInput.readline() when os.fstat() raises OSError. + This exception should be silently discarded.""" + + os_fstat_orig = os.fstat + os_fstat_replacement = UnconditionallyRaise(OSError) + try: + t = writeTmp(1, ["\n"]) + self.addCleanup(remove_tempfiles, t) + with FileInput(files=[t], inplace=True) as fi: + os.fstat = os_fstat_replacement + fi.readline() + finally: + os.fstat = os_fstat_orig + + # sanity check to make sure that our test scenario was actually hit + self.assertTrue(os_fstat_replacement.invoked, + "os.fstat() was not invoked") + + @unittest.skipIf(not hasattr(os, "chmod"), "os.chmod does not exist") + def test_readline_os_chmod_raises_OSError(self): + """Tests invoking FileInput.readline() when os.chmod() raises OSError. + This exception should be silently discarded.""" + + os_chmod_orig = os.chmod + os_chmod_replacement = UnconditionallyRaise(OSError) + try: + t = writeTmp(1, ["\n"]) + self.addCleanup(remove_tempfiles, t) + with FileInput(files=[t], inplace=True) as fi: + os.chmod = os_chmod_replacement + fi.readline() + finally: + os.chmod = os_chmod_orig + + # sanity check to make sure that our test scenario was actually hit + self.assertTrue(os_chmod_replacement.invoked, + "os.fstat() was not invoked") + + def test_fileno_when_ValueError_raised(self): + class FilenoRaisesValueError(UnconditionallyRaise): + def __init__(self): + UnconditionallyRaise.__init__(self, ValueError) + def fileno(self): + self.__call__() + + unconditionally_raise_ValueError = FilenoRaisesValueError() + t = writeTmp(1, ["\n"]) + self.addCleanup(remove_tempfiles, t) + with FileInput(files=[t]) as fi: + file_backup = fi._file + try: + fi._file = unconditionally_raise_ValueError + result = fi.fileno() + finally: + fi._file = file_backup # make sure the file gets cleaned up + + # sanity check to make sure that our test scenario was actually hit + self.assertTrue(unconditionally_raise_ValueError.invoked, + "_file.fileno() was not invoked") + + self.assertEqual(result, -1, "fileno() should return -1") + +class MockFileInput: + """A class that mocks out fileinput.FileInput for use during unit tests""" + + def __init__(self, files=None, inplace=False, backup="", bufsize=0, + mode="r", openhook=None): + self.files = files + self.inplace = inplace + self.backup = backup + self.bufsize = bufsize + self.mode = mode + self.openhook = openhook + self._file = None + self.invocation_counts = collections.defaultdict(lambda: 0) + self.return_values = {} + + def close(self): + self.invocation_counts["close"] += 1 + + def nextfile(self): + self.invocation_counts["nextfile"] += 1 + return self.return_values["nextfile"] + + def filename(self): + self.invocation_counts["filename"] += 1 + return self.return_values["filename"] + + def lineno(self): + self.invocation_counts["lineno"] += 1 + return self.return_values["lineno"] + + def filelineno(self): + self.invocation_counts["filelineno"] += 1 + return self.return_values["filelineno"] + + def fileno(self): + self.invocation_counts["fileno"] += 1 + return self.return_values["fileno"] + + def isfirstline(self): + self.invocation_counts["isfirstline"] += 1 + return self.return_values["isfirstline"] + + def isstdin(self): + self.invocation_counts["isstdin"] += 1 + return self.return_values["isstdin"] + +class BaseFileInputGlobalMethodsTest(unittest.TestCase): + """Base class for unit tests for the global function of + the fileinput module.""" + + def setUp(self): + self._orig_state = fileinput._state + self._orig_FileInput = fileinput.FileInput + fileinput.FileInput = MockFileInput + + def tearDown(self): + fileinput.FileInput = self._orig_FileInput + fileinput._state = self._orig_state + + def assertExactlyOneInvocation(self, mock_file_input, method_name): + # assert that the method with the given name was invoked once + actual_count = mock_file_input.invocation_counts[method_name] + self.assertEqual(actual_count, 1, method_name) + # assert that no other unexpected methods were invoked + actual_total_count = len(mock_file_input.invocation_counts) + self.assertEqual(actual_total_count, 1) + +class Test_fileinput_input(BaseFileInputGlobalMethodsTest): + """Unit tests for fileinput.input()""" + + def test_state_is_not_None_and_state_file_is_not_None(self): + """Tests invoking fileinput.input() when fileinput._state is not None + and its _file attribute is also not None. Expect RuntimeError to + be raised with a meaningful error message and for fileinput._state + to *not* be modified.""" + instance = MockFileInput() + instance._file = object() + fileinput._state = instance + with self.assertRaises(RuntimeError) as cm: + fileinput.input() + self.assertEqual(("input() already active",), cm.exception.args) + self.assertIs(instance, fileinput._state, "fileinput._state") + + def test_state_is_not_None_and_state_file_is_None(self): + """Tests invoking fileinput.input() when fileinput._state is not None + but its _file attribute *is* None. Expect it to create and return + a new fileinput.FileInput object with all method parameters passed + explicitly to the __init__() method; also ensure that + fileinput._state is set to the returned instance.""" + instance = MockFileInput() + instance._file = None + fileinput._state = instance + self.do_test_call_input() + + def test_state_is_None(self): + """Tests invoking fileinput.input() when fileinput._state is None + Expect it to create and return a new fileinput.FileInput object + with all method parameters passed explicitly to the __init__() + method; also ensure that fileinput._state is set to the returned + instance.""" + fileinput._state = None + self.do_test_call_input() + + def do_test_call_input(self): + """Tests that fileinput.input() creates a new fileinput.FileInput + object, passing the given parameters unmodified to + fileinput.FileInput.__init__(). Note that this test depends on the + monkey patching of fileinput.FileInput done by setUp().""" + files = object() + inplace = object() + backup = object() + bufsize = object() + mode = object() + openhook = object() + + # call fileinput.input() with different values for each argument + result = fileinput.input(files=files, inplace=inplace, backup=backup, + bufsize=bufsize, + mode=mode, openhook=openhook) + + # ensure fileinput._state was set to the returned object + self.assertIs(result, fileinput._state, "fileinput._state") + + # ensure the parameters to fileinput.input() were passed directly + # to FileInput.__init__() + self.assertIs(files, result.files, "files") + self.assertIs(inplace, result.inplace, "inplace") + self.assertIs(backup, result.backup, "backup") + self.assertIs(bufsize, result.bufsize, "bufsize") + self.assertIs(mode, result.mode, "mode") + self.assertIs(openhook, result.openhook, "openhook") + +class Test_fileinput_close(BaseFileInputGlobalMethodsTest): + """Unit tests for fileinput.close()""" + + def test_state_is_None(self): + """Tests that fileinput.close() does nothing if fileinput._state + is None""" + fileinput._state = None + fileinput.close() + self.assertIsNone(fileinput._state) + + def test_state_is_not_None(self): + """Tests that fileinput.close() invokes close() on fileinput._state + and sets _state=None""" + instance = MockFileInput() + fileinput._state = instance + fileinput.close() + self.assertExactlyOneInvocation(instance, "close") + self.assertIsNone(fileinput._state) + +class Test_fileinput_nextfile(BaseFileInputGlobalMethodsTest): + """Unit tests for fileinput.nextfile()""" + + def test_state_is_None(self): + """Tests fileinput.nextfile() when fileinput._state is None. + Ensure that it raises RuntimeError with a meaningful error message + and does not modify fileinput._state""" + fileinput._state = None + with self.assertRaises(RuntimeError) as cm: + fileinput.nextfile() + self.assertEqual(("no active input()",), cm.exception.args) + self.assertIsNone(fileinput._state) + + def test_state_is_not_None(self): + """Tests fileinput.nextfile() when fileinput._state is not None. + Ensure that it invokes fileinput._state.nextfile() exactly once, + returns whatever it returns, and does not modify fileinput._state + to point to a different object.""" + nextfile_retval = object() + instance = MockFileInput() + instance.return_values["nextfile"] = nextfile_retval + fileinput._state = instance + retval = fileinput.nextfile() + self.assertExactlyOneInvocation(instance, "nextfile") + self.assertIs(retval, nextfile_retval) + self.assertIs(fileinput._state, instance) + +class Test_fileinput_filename(BaseFileInputGlobalMethodsTest): + """Unit tests for fileinput.filename()""" + + def test_state_is_None(self): + """Tests fileinput.filename() when fileinput._state is None. + Ensure that it raises RuntimeError with a meaningful error message + and does not modify fileinput._state""" + fileinput._state = None + with self.assertRaises(RuntimeError) as cm: + fileinput.filename() + self.assertEqual(("no active input()",), cm.exception.args) + self.assertIsNone(fileinput._state) + + def test_state_is_not_None(self): + """Tests fileinput.filename() when fileinput._state is not None. + Ensure that it invokes fileinput._state.filename() exactly once, + returns whatever it returns, and does not modify fileinput._state + to point to a different object.""" + filename_retval = object() + instance = MockFileInput() + instance.return_values["filename"] = filename_retval + fileinput._state = instance + retval = fileinput.filename() + self.assertExactlyOneInvocation(instance, "filename") + self.assertIs(retval, filename_retval) + self.assertIs(fileinput._state, instance) + +class Test_fileinput_lineno(BaseFileInputGlobalMethodsTest): + """Unit tests for fileinput.lineno()""" + + def test_state_is_None(self): + """Tests fileinput.lineno() when fileinput._state is None. + Ensure that it raises RuntimeError with a meaningful error message + and does not modify fileinput._state""" + fileinput._state = None + with self.assertRaises(RuntimeError) as cm: + fileinput.lineno() + self.assertEqual(("no active input()",), cm.exception.args) + self.assertIsNone(fileinput._state) + + def test_state_is_not_None(self): + """Tests fileinput.lineno() when fileinput._state is not None. + Ensure that it invokes fileinput._state.lineno() exactly once, + returns whatever it returns, and does not modify fileinput._state + to point to a different object.""" + lineno_retval = object() + instance = MockFileInput() + instance.return_values["lineno"] = lineno_retval + fileinput._state = instance + retval = fileinput.lineno() + self.assertExactlyOneInvocation(instance, "lineno") + self.assertIs(retval, lineno_retval) + self.assertIs(fileinput._state, instance) + +class Test_fileinput_filelineno(BaseFileInputGlobalMethodsTest): + """Unit tests for fileinput.filelineno()""" + + def test_state_is_None(self): + """Tests fileinput.filelineno() when fileinput._state is None. + Ensure that it raises RuntimeError with a meaningful error message + and does not modify fileinput._state""" + fileinput._state = None + with self.assertRaises(RuntimeError) as cm: + fileinput.filelineno() + self.assertEqual(("no active input()",), cm.exception.args) + self.assertIsNone(fileinput._state) + + def test_state_is_not_None(self): + """Tests fileinput.filelineno() when fileinput._state is not None. + Ensure that it invokes fileinput._state.filelineno() exactly once, + returns whatever it returns, and does not modify fileinput._state + to point to a different object.""" + filelineno_retval = object() + instance = MockFileInput() + instance.return_values["filelineno"] = filelineno_retval + fileinput._state = instance + retval = fileinput.filelineno() + self.assertExactlyOneInvocation(instance, "filelineno") + self.assertIs(retval, filelineno_retval) + self.assertIs(fileinput._state, instance) + +class Test_fileinput_fileno(BaseFileInputGlobalMethodsTest): + """Unit tests for fileinput.fileno()""" + + def test_state_is_None(self): + """Tests fileinput.fileno() when fileinput._state is None. + Ensure that it raises RuntimeError with a meaningful error message + and does not modify fileinput._state""" + fileinput._state = None + with self.assertRaises(RuntimeError) as cm: + fileinput.fileno() + self.assertEqual(("no active input()",), cm.exception.args) + self.assertIsNone(fileinput._state) + + def test_state_is_not_None(self): + """Tests fileinput.fileno() when fileinput._state is not None. + Ensure that it invokes fileinput._state.fileno() exactly once, + returns whatever it returns, and does not modify fileinput._state + to point to a different object.""" + fileno_retval = object() + instance = MockFileInput() + instance.return_values["fileno"] = fileno_retval + instance.fileno_retval = fileno_retval + fileinput._state = instance + retval = fileinput.fileno() + self.assertExactlyOneInvocation(instance, "fileno") + self.assertIs(retval, fileno_retval) + self.assertIs(fileinput._state, instance) + +class Test_fileinput_isfirstline(BaseFileInputGlobalMethodsTest): + """Unit tests for fileinput.isfirstline()""" + + def test_state_is_None(self): + """Tests fileinput.isfirstline() when fileinput._state is None. + Ensure that it raises RuntimeError with a meaningful error message + and does not modify fileinput._state""" + fileinput._state = None + with self.assertRaises(RuntimeError) as cm: + fileinput.isfirstline() + self.assertEqual(("no active input()",), cm.exception.args) + self.assertIsNone(fileinput._state) + + def test_state_is_not_None(self): + """Tests fileinput.isfirstline() when fileinput._state is not None. + Ensure that it invokes fileinput._state.isfirstline() exactly once, + returns whatever it returns, and does not modify fileinput._state + to point to a different object.""" + isfirstline_retval = object() + instance = MockFileInput() + instance.return_values["isfirstline"] = isfirstline_retval + fileinput._state = instance + retval = fileinput.isfirstline() + self.assertExactlyOneInvocation(instance, "isfirstline") + self.assertIs(retval, isfirstline_retval) + self.assertIs(fileinput._state, instance) + +class Test_fileinput_isstdin(BaseFileInputGlobalMethodsTest): + """Unit tests for fileinput.isstdin()""" + + def test_state_is_None(self): + """Tests fileinput.isstdin() when fileinput._state is None. + Ensure that it raises RuntimeError with a meaningful error message + and does not modify fileinput._state""" + fileinput._state = None + with self.assertRaises(RuntimeError) as cm: + fileinput.isstdin() + self.assertEqual(("no active input()",), cm.exception.args) + self.assertIsNone(fileinput._state) + + def test_state_is_not_None(self): + """Tests fileinput.isstdin() when fileinput._state is not None. + Ensure that it invokes fileinput._state.isstdin() exactly once, + returns whatever it returns, and does not modify fileinput._state + to point to a different object.""" + isstdin_retval = object() + instance = MockFileInput() + instance.return_values["isstdin"] = isstdin_retval + fileinput._state = instance + retval = fileinput.isstdin() + self.assertExactlyOneInvocation(instance, "isstdin") + self.assertIs(retval, isstdin_retval) + self.assertIs(fileinput._state, instance) + +class InvocationRecorder: + def __init__(self): + self.invocation_count = 0 + def __call__(self, *args, **kwargs): + self.invocation_count += 1 + self.last_invocation = (args, kwargs) + +class Test_hook_compressed(unittest.TestCase): + """Unit tests for fileinput.hook_compressed()""" + + def setUp(self): + self.fake_open = InvocationRecorder() + + def test_empty_string(self): + self.do_test_use_builtin_open("", 1) + + def test_no_ext(self): + self.do_test_use_builtin_open("abcd", 2) + + def test_gz_ext_fake(self): + original_open = gzip.open + gzip.open = self.fake_open + try: + result = fileinput.hook_compressed("test.gz", 3) + finally: + gzip.open = original_open + + self.assertEqual(self.fake_open.invocation_count, 1) + self.assertEqual(self.fake_open.last_invocation, (("test.gz", 3), {})) + + @unittest.skipUnless(bz2, "Requires bz2") + def test_bz2_ext_fake(self): + original_open = bz2.BZ2File + bz2.BZ2File = self.fake_open + try: + result = fileinput.hook_compressed("test.bz2", 4) + finally: + bz2.BZ2File = original_open + + self.assertEqual(self.fake_open.invocation_count, 1) + self.assertEqual(self.fake_open.last_invocation, (("test.bz2", 4), {})) + + def test_blah_ext(self): + self.do_test_use_builtin_open("abcd.blah", 5) + + def test_gz_ext_builtin(self): + self.do_test_use_builtin_open("abcd.Gz", 6) + + def test_bz2_ext_builtin(self): + self.do_test_use_builtin_open("abcd.Bz2", 7) + + def do_test_use_builtin_open(self, filename, mode): + original_open = self.replace_builtin_open(self.fake_open) + try: + result = fileinput.hook_compressed(filename, mode) + finally: + self.replace_builtin_open(original_open) + + self.assertEqual(self.fake_open.invocation_count, 1) + self.assertEqual(self.fake_open.last_invocation, + ((filename, mode), {})) + + @staticmethod + def replace_builtin_open(new_open_func): + builtins_type = type(__builtins__) + if builtins_type is dict: + original_open = __builtins__["open"] + __builtins__["open"] = new_open_func + elif builtins_type is types.ModuleType: + original_open = __builtins__.open + __builtins__.open = new_open_func + else: + raise RuntimeError( + "unknown __builtins__ type: %r (unable to replace open)" % + builtins_type) + + return original_open + +class Test_hook_encoded(unittest.TestCase): + """Unit tests for fileinput.hook_encoded()""" + + def test(self): + encoding = object() + result = fileinput.hook_encoded(encoding) + + fake_open = InvocationRecorder() + original_open = codecs.open + codecs.open = fake_open + try: + filename = object() + mode = object() + open_result = result(filename, mode) + finally: + codecs.open = original_open + + self.assertEqual(fake_open.invocation_count, 1) + + args = fake_open.last_invocation[0] + self.assertIs(args[0], filename) + self.assertIs(args[1], mode) + self.assertIs(args[2], encoding) def test_main(): - run_unittest(BufferSizesTests, FileInputTests) + run_unittest( + BufferSizesTests, + FileInputTests, + Test_fileinput_input, + Test_fileinput_close, + Test_fileinput_nextfile, + Test_fileinput_filename, + Test_fileinput_lineno, + Test_fileinput_filelineno, + Test_fileinput_fileno, + Test_fileinput_isfirstline, + Test_fileinput_isstdin, + Test_hook_compressed, + Test_hook_encoded, + ) if __name__ == "__main__": test_main() diff --git a/Lib/test/test_float.py b/Lib/test/test_float.py index 1968b8a..4e6f854 100644 --- a/Lib/test/test_float.py +++ b/Lib/test/test_float.py @@ -88,7 +88,7 @@ class GeneralFloatCases(unittest.TestCase): self.assertRaises(ValueError, float, " -0x3.p-1 ") self.assertRaises(ValueError, float, " +0x3.p-1 ") self.assertEqual(float(" 25.e-1 "), 2.5) - self.assertEqual(support.fcmp(float(" .25e-1 "), .025), 0) + self.assertAlmostEqual(float(" .25e-1 "), .025) def test_floatconversion(self): # Make sure that calls to __float__() work properly diff --git a/Lib/test/test_ftplib.py b/Lib/test/test_ftplib.py index 9d2eab7..127b1b9 100644 --- a/Lib/test/test_ftplib.py +++ b/Lib/test/test_ftplib.py @@ -608,6 +608,20 @@ class TestFTPClass(TestCase): self.assertEqual(self.server.handler_instance.last_received_cmd, 'quit') self.assertFalse(is_client_connected()) + def test_source_address(self): + self.client.quit() + port = support.find_unused_port() + self.client.connect(self.server.host, self.server.port, + source_address=(HOST, port)) + self.assertEqual(self.client.sock.getsockname()[1], port) + self.client.quit() + + def test_source_address_passive_connection(self): + port = support.find_unused_port() + self.client.source_address = (HOST, port) + with self.client.transfercmd('list') as sock: + self.assertEqual(sock.getsockname()[1], port) + def test_parse257(self): self.assertEqual(ftplib.parse257('257 "/foo/bar"'), '/foo/bar') self.assertEqual(ftplib.parse257('257 "/foo/bar" created'), '/foo/bar') diff --git a/Lib/test/test_gc.py b/Lib/test/test_gc.py index 0e5a397..f60d5d9 100644 --- a/Lib/test/test_gc.py +++ b/Lib/test/test_gc.py @@ -1,5 +1,6 @@ import unittest -from test.support import verbose, run_unittest, strip_python_stderr +from test.support import (verbose, refcount_test, run_unittest, + strip_python_stderr) import sys import gc import weakref @@ -175,6 +176,7 @@ class GCTests(unittest.TestCase): del d self.assertEqual(gc.collect(), 2) + @refcount_test def test_frame(self): def f(): frame = sys._getframe() @@ -242,6 +244,7 @@ class GCTests(unittest.TestCase): # For example: # - disposed tuples are not freed, but reused # - the call to assertEqual somehow avoids building its args tuple + @refcount_test def test_get_count(self): # Avoid future allocation of method object assertEqual = self._baseAssertEqual @@ -252,6 +255,7 @@ class GCTests(unittest.TestCase): # the dict, and the tuple returned by get_count() assertEqual(gc.get_count(), (2, 0, 0)) + @refcount_test def test_collect_generations(self): # Avoid future allocation of method object assertEqual = self.assertEqual diff --git a/Lib/test/test_genexps.py b/Lib/test/test_genexps.py index 1f46af1..413043c 100644 --- a/Lib/test/test_genexps.py +++ b/Lib/test/test_genexps.py @@ -257,11 +257,15 @@ Verify that genexps are weakly referencable """ +import sys -__test__ = {'doctests' : doctests} +# Trace function can throw off the tuple reuse test. +if hasattr(sys, 'gettrace') and sys.gettrace(): + __test__ = {} +else: + __test__ = {'doctests' : doctests} def test_main(verbose=None): - import sys from test import support from test import test_genexps support.run_doctest(test_genexps, verbose) diff --git a/Lib/test/test_imp.py b/Lib/test/test_imp.py index d745ae9..83e17d3 100644 --- a/Lib/test/test_imp.py +++ b/Lib/test/test_imp.py @@ -210,6 +210,10 @@ class PEP3147Tests(unittest.TestCase): self.assertEqual( imp.cache_from_source('/foo/bar/baz/qux.py', True), '/foo/bar/baz/__pycache__/qux.{}.pyc'.format(self.tag)) + # Directory with a dot, filename without dot + self.assertEqual( + imp.cache_from_source('/foo.bar/file', True), + '/foo.bar/__pycache__/file{}.pyc'.format(self.tag)) def test_cache_from_source_optimized(self): # Given the path to a .py file, return the path to its PEP 3147 diff --git a/Lib/test/test_import.py b/Lib/test/test_import.py index 95a5f48..0332fdd 100644 --- a/Lib/test/test_import.py +++ b/Lib/test/test_import.py @@ -286,8 +286,6 @@ class ImportTests(unittest.TestCase): self.skipTest('path is not encodable to {}'.format(encoding)) with self.assertRaises(ImportError) as c: __import__(path) - self.assertEqual("Import by filename is not supported.", - c.exception.args[0]) def test_import_in_del_does_not_crash(self): # Issue 4236 diff --git a/Lib/test/test_importhooks.py b/Lib/test/test_importhooks.py index ec6730e..7a25657 100644 --- a/Lib/test/test_importhooks.py +++ b/Lib/test/test_importhooks.py @@ -51,7 +51,7 @@ class TestImporter: def __init__(self, path=test_path): if path != test_path: - # if out class is on sys.path_hooks, we must raise + # if our class is on sys.path_hooks, we must raise # ImportError for any path item that we can't handle. raise ImportError self.path = path @@ -229,7 +229,9 @@ class ImportHooksTestCase(ImportHooksBaseTestCase): i = ImpWrapper() sys.meta_path.append(i) sys.path_hooks.append(ImpWrapper) - mnames = ("colorsys", "urllib.parse", "distutils.core") + mnames = ( + "colorsys", "urllib.parse", "distutils.core", "sys", + ) for mname in mnames: parent = mname.split(".")[0] for n in list(sys.modules): @@ -237,7 +239,8 @@ class ImportHooksTestCase(ImportHooksBaseTestCase): del sys.modules[n] for mname in mnames: m = __import__(mname, globals(), locals(), ["__dummy__"]) - m.__loader__ # to make sure we actually handled the import + # to make sure we actually handled the import + self.assertTrue(hasattr(m, "__loader__")) def test_main(): diff --git a/Lib/test/test_io.py b/Lib/test/test_io.py index b665a6c..1821492 100644 --- a/Lib/test/test_io.py +++ b/Lib/test/test_io.py @@ -46,7 +46,7 @@ except ImportError: def _default_chunk_size(): """Get the default TextIOWrapper chunk size""" - with open(__file__, "r", encoding="latin1") as f: + with open(__file__, "r", encoding="latin-1") as f: return f._CHUNK_SIZE @@ -1684,11 +1684,11 @@ class TextIOWrapperTest(unittest.TestCase): r = self.BytesIO(b"\xc3\xa9\n\n") b = self.BufferedReader(r, 1000) t = self.TextIOWrapper(b) - t.__init__(b, encoding="latin1", newline="\r\n") - self.assertEqual(t.encoding, "latin1") + t.__init__(b, encoding="latin-1", newline="\r\n") + self.assertEqual(t.encoding, "latin-1") self.assertEqual(t.line_buffering, False) - t.__init__(b, encoding="utf8", line_buffering=True) - self.assertEqual(t.encoding, "utf8") + t.__init__(b, encoding="utf-8", line_buffering=True) + self.assertEqual(t.encoding, "utf-8") self.assertEqual(t.line_buffering, True) self.assertEqual("\xe9\n", t.readline()) self.assertRaises(TypeError, t.__init__, b, newline=42) @@ -1738,8 +1738,8 @@ class TextIOWrapperTest(unittest.TestCase): def test_encoding(self): # Check the encoding attribute is always set, and valid b = self.BytesIO() - t = self.TextIOWrapper(b, encoding="utf8") - self.assertEqual(t.encoding, "utf8") + t = self.TextIOWrapper(b, encoding="utf-8") + self.assertEqual(t.encoding, "utf-8") t = self.TextIOWrapper(b) self.assertTrue(t.encoding is not None) codecs.lookup(t.encoding) @@ -1918,7 +1918,7 @@ class TextIOWrapperTest(unittest.TestCase): def test_basic_io(self): for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65): - for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le": + for enc in "ascii", "latin-1", "utf-8" :# , "utf-16-be", "utf-16-le": f = self.open(support.TESTFN, "w+", encoding=enc) f._CHUNK_SIZE = chunksize self.assertEqual(f.write("abc"), 3) @@ -1968,7 +1968,7 @@ class TextIOWrapperTest(unittest.TestCase): self.assertEqual(rlines, wlines) def test_telling(self): - f = self.open(support.TESTFN, "w+", encoding="utf8") + f = self.open(support.TESTFN, "w+", encoding="utf-8") p0 = f.tell() f.write("\xff\n") p1 = f.tell() @@ -2214,6 +2214,7 @@ class TextIOWrapperTest(unittest.TestCase): with self.open(support.TESTFN, "w", errors="replace") as f: self.assertEqual(f.errors, "replace") + @support.no_tracing @unittest.skipUnless(threading, 'Threading required for this test.') def test_threads_write(self): # Issue6750: concurrent writes could duplicate data @@ -2670,6 +2671,7 @@ class SignalsTest(unittest.TestCase): def test_interrupted_write_text(self): self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii") + @support.no_tracing def check_reentrant_write(self, data, **fdopen_kwargs): def on_alarm(*args): # Will be called reentrantly from the same thread diff --git a/Lib/test/test_mailbox.py b/Lib/test/test_mailbox.py index 1e4f887..03f814a 100644 --- a/Lib/test/test_mailbox.py +++ b/Lib/test/test_mailbox.py @@ -95,14 +95,14 @@ class TestMailbox(TestBase): """) def test_add_invalid_8bit_bytes_header(self): - key = self._box.add(self._nonascii_msg.encode('latin1')) + key = self._box.add(self._nonascii_msg.encode('latin-1')) self.assertEqual(len(self._box), 1) self.assertEqual(self._box.get_bytes(key), - self._nonascii_msg.encode('latin1')) + self._nonascii_msg.encode('latin-1')) def test_invalid_nonascii_header_as_string(self): subj = self._nonascii_msg.splitlines()[1] - key = self._box.add(subj.encode('latin1')) + key = self._box.add(subj.encode('latin-1')) self.assertEqual(self._box.get_string(key), 'Subject: =?unknown-8bit?b?RmFsaW5hcHThciBo4Xpob3pzeuFsbO104XNz' 'YWwuIE3hciByZW5kZWx06Ww/?=\n\n') diff --git a/Lib/test/test_metaclass.py b/Lib/test/test_metaclass.py index 219ab99..6862900 100644 --- a/Lib/test/test_metaclass.py +++ b/Lib/test/test_metaclass.py @@ -246,7 +246,13 @@ Test failures in looking up the __prepare__ method work. """ -__test__ = {'doctests' : doctests} +import sys + +# Trace function introduces __locals__ which causes various tests to fail. +if hasattr(sys, 'gettrace') and sys.gettrace(): + __test__ = {} +else: + __test__ = {'doctests' : doctests} def test_main(verbose=False): from test import support diff --git a/Lib/test/test_minidom.py b/Lib/test/test_minidom.py index 4c2b34a..acc4819 100644 --- a/Lib/test/test_minidom.py +++ b/Lib/test/test_minidom.py @@ -4,9 +4,7 @@ import pickle from test.support import verbose, run_unittest, findfile import unittest -import xml.dom import xml.dom.minidom -import xml.parsers.expat from xml.dom.minidom import parse, Node, Document, parseString from xml.dom.minidom import getDOMImplementation @@ -14,7 +12,6 @@ from xml.dom.minidom import getDOMImplementation tstfile = findfile("test.xml", subdir="xmltestdata") - # The tests of DocumentType importing use these helpers to construct # the documents to work with, since not all DOM builders actually # create the DocumentType nodes. @@ -1009,41 +1006,6 @@ class MinidomTest(unittest.TestCase): "test NodeList.item()") doc.unlink() - def testSAX2DOM(self): - from xml.dom import pulldom - - sax2dom = pulldom.SAX2DOM() - sax2dom.startDocument() - sax2dom.startElement("doc", {}) - sax2dom.characters("text") - sax2dom.startElement("subelm", {}) - sax2dom.characters("text") - sax2dom.endElement("subelm") - sax2dom.characters("text") - sax2dom.endElement("doc") - sax2dom.endDocument() - - doc = sax2dom.document - root = doc.documentElement - (text1, elm1, text2) = root.childNodes - text3 = elm1.childNodes[0] - - self.confirm(text1.previousSibling is None and - text1.nextSibling is elm1 and - elm1.previousSibling is text1 and - elm1.nextSibling is text2 and - text2.previousSibling is elm1 and - text2.nextSibling is None and - text3.previousSibling is None and - text3.nextSibling is None, "testSAX2DOM - siblings") - - self.confirm(root.parentNode is doc and - text1.parentNode is root and - elm1.parentNode is root and - text2.parentNode is root and - text3.parentNode is elm1, "testSAX2DOM - parents") - doc.unlink() - def testEncodings(self): doc = parseString('<foo>€</foo>') self.assertEqual(doc.toxml(), @@ -1490,6 +1452,7 @@ class MinidomTest(unittest.TestCase): doc.appendChild(doc.createComment("foo--bar")) self.assertRaises(ValueError, doc.toxml) + def testEmptyXMLNSValue(self): doc = parseString("<element xmlns=''>\n" "<foo/>\n</element>") diff --git a/Lib/test/test_multiprocessing.py b/Lib/test/test_multiprocessing.py index 644145f..8a33f5e 100644 --- a/Lib/test/test_multiprocessing.py +++ b/Lib/test/test_multiprocessing.py @@ -163,6 +163,18 @@ class _TestProcess(BaseTestCase): self.assertEqual(current.ident, os.getpid()) self.assertEqual(current.exitcode, None) + def test_daemon_argument(self): + if self.TYPE == "threads": + return + + # By default uses the current process's daemon flag. + proc0 = self.Process(target=self._test) + self.assertEqual(proc0.daemon, self.current_process().daemon) + proc1 = self.Process(target=self._test, daemon=True) + self.assertTrue(proc1.daemon) + proc2 = self.Process(target=self._test, daemon=False) + self.assertFalse(proc2.daemon) + @classmethod def _test(cls, q, *args, **kwds): current = cls.current_process() diff --git a/Lib/test/test_nntplib.py b/Lib/test/test_nntplib.py index a387f61..ec790ad 100644 --- a/Lib/test/test_nntplib.py +++ b/Lib/test/test_nntplib.py @@ -1,10 +1,11 @@ import io +import socket import datetime import textwrap import unittest import functools import contextlib -import collections +import collections.abc from test import support from nntplib import NNTP, GroupInfo, _have_ssl import nntplib @@ -246,12 +247,32 @@ class NetworkedNNTPTestsMixin: if not name.startswith('test_'): continue meth = getattr(cls, name) - if not isinstance(meth, collections.Callable): + if not isinstance(meth, collections.abc.Callable): continue # Need to use a closure so that meth remains bound to its current # value setattr(cls, name, wrap_meth(meth)) + def test_with_statement(self): + def is_connected(): + if not hasattr(server, 'file'): + return False + try: + server.help() + except (socket.error, EOFError): + return False + return True + + with self.NNTP_CLASS(self.NNTP_HOST, timeout=TIMEOUT, usenetrc=False) as server: + self.assertTrue(is_connected()) + self.assertTrue(server.help()) + self.assertFalse(is_connected()) + + with self.NNTP_CLASS(self.NNTP_HOST, timeout=TIMEOUT, usenetrc=False) as server: + server.quit() + self.assertFalse(is_connected()) + + NetworkedNNTPTestsMixin.wrap_methods() @@ -813,7 +834,7 @@ class NNTPv1v2TestsMixin: def _check_article_body(self, lines): self.assertEqual(len(lines), 4) - self.assertEqual(lines[-1].decode('utf8'), "-- Signed by André.") + self.assertEqual(lines[-1].decode('utf-8'), "-- Signed by André.") self.assertEqual(lines[-2], b"") self.assertEqual(lines[-3], b".Here is a dot-starting line.") self.assertEqual(lines[-4], b"This is just a test article.") diff --git a/Lib/test/test_optparse.py b/Lib/test/test_optparse.py index 7b95612..61f44ee 100644 --- a/Lib/test/test_optparse.py +++ b/Lib/test/test_optparse.py @@ -631,7 +631,7 @@ class TestStandard(BaseTest): option_list=options) def test_required_value(self): - self.assertParseFail(["-a"], "-a option requires an argument") + self.assertParseFail(["-a"], "-a option requires 1 argument") def test_invalid_integer(self): self.assertParseFail(["-b", "5x"], diff --git a/Lib/test/test_os.py b/Lib/test/test_os.py index 56be375..35aa7fa 100644 --- a/Lib/test/test_os.py +++ b/Lib/test/test_os.py @@ -15,6 +15,13 @@ from test import support import contextlib import mmap import uuid +import asyncore +import asynchat +import socket +try: + import threading +except ImportError: + threading = None # Detect whether we're on a Linux system that uses the (now outdated # and unmaintained) linuxthreads threading library. There's an issue @@ -1280,6 +1287,287 @@ class LoginTests(unittest.TestCase): self.assertNotEqual(len(user_name), 0) +@unittest.skipUnless(hasattr(os, 'getpriority') and hasattr(os, 'setpriority'), + "needs os.getpriority and os.setpriority") +class ProgramPriorityTests(unittest.TestCase): + """Tests for os.getpriority() and os.setpriority().""" + + def test_set_get_priority(self): + + base = os.getpriority(os.PRIO_PROCESS, os.getpid()) + os.setpriority(os.PRIO_PROCESS, os.getpid(), base + 1) + try: + new_prio = os.getpriority(os.PRIO_PROCESS, os.getpid()) + if base >= 19 and new_prio <= 19: + raise unittest.SkipTest( + "unable to reliably test setpriority at current nice level of %s" % base) + else: + self.assertEqual(new_prio, base + 1) + finally: + try: + os.setpriority(os.PRIO_PROCESS, os.getpid(), base) + except OSError as err: + if err.errno != errno.EACCES: + raise + + +class SendfileTestServer(asyncore.dispatcher, threading.Thread): + + class Handler(asynchat.async_chat): + + def __init__(self, conn): + asynchat.async_chat.__init__(self, conn) + self.in_buffer = [] + self.closed = False + self.push(b"220 ready\r\n") + + def handle_read(self): + data = self.recv(4096) + self.in_buffer.append(data) + + def get_data(self): + return b''.join(self.in_buffer) + + def handle_close(self): + self.close() + self.closed = True + + def handle_error(self): + raise + + def __init__(self, address): + threading.Thread.__init__(self) + asyncore.dispatcher.__init__(self) + self.create_socket(socket.AF_INET, socket.SOCK_STREAM) + self.bind(address) + self.listen(5) + self.host, self.port = self.socket.getsockname()[:2] + self.handler_instance = None + self._active = False + self._active_lock = threading.Lock() + + # --- public API + + @property + def running(self): + return self._active + + def start(self): + assert not self.running + self.__flag = threading.Event() + threading.Thread.start(self) + self.__flag.wait() + + def stop(self): + assert self.running + self._active = False + self.join() + + def wait(self): + # wait for handler connection to be closed, then stop the server + while not getattr(self.handler_instance, "closed", False): + time.sleep(0.001) + self.stop() + + # --- internals + + def run(self): + self._active = True + self.__flag.set() + while self._active and asyncore.socket_map: + self._active_lock.acquire() + asyncore.loop(timeout=0.001, count=1) + self._active_lock.release() + asyncore.close_all() + + def handle_accept(self): + conn, addr = self.accept() + self.handler_instance = self.Handler(conn) + + def handle_connect(self): + self.close() + handle_read = handle_connect + + def writable(self): + return 0 + + def handle_error(self): + raise + + +@unittest.skipUnless(threading is not None, "test needs threading module") +@unittest.skipUnless(hasattr(os, 'sendfile'), "test needs os.sendfile()") +class TestSendfile(unittest.TestCase): + + DATA = b"12345abcde" * 16 * 1024 # 160 KB + SUPPORT_HEADERS_TRAILERS = not sys.platform.startswith("linux") and \ + not sys.platform.startswith("solaris") and \ + not sys.platform.startswith("sunos") + + @classmethod + def setUpClass(cls): + with open(support.TESTFN, "wb") as f: + f.write(cls.DATA) + + @classmethod + def tearDownClass(cls): + support.unlink(support.TESTFN) + + def setUp(self): + self.server = SendfileTestServer((support.HOST, 0)) + self.server.start() + self.client = socket.socket() + self.client.connect((self.server.host, self.server.port)) + self.client.settimeout(1) + # synchronize by waiting for "220 ready" response + self.client.recv(1024) + self.sockno = self.client.fileno() + self.file = open(support.TESTFN, 'rb') + self.fileno = self.file.fileno() + + def tearDown(self): + self.file.close() + self.client.close() + if self.server.running: + self.server.stop() + + def sendfile_wrapper(self, sock, file, offset, nbytes, headers=[], trailers=[]): + """A higher level wrapper representing how an application is + supposed to use sendfile(). + """ + while 1: + try: + if self.SUPPORT_HEADERS_TRAILERS: + return os.sendfile(sock, file, offset, nbytes, headers, + trailers) + else: + return os.sendfile(sock, file, offset, nbytes) + except OSError as err: + if err.errno == errno.ECONNRESET: + # disconnected + raise + elif err.errno in (errno.EAGAIN, errno.EBUSY): + # we have to retry send data + continue + else: + raise + + def test_send_whole_file(self): + # normal send + total_sent = 0 + offset = 0 + nbytes = 4096 + while total_sent < len(self.DATA): + sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes) + if sent == 0: + break + offset += sent + total_sent += sent + self.assertTrue(sent <= nbytes) + self.assertEqual(offset, total_sent) + + self.assertEqual(total_sent, len(self.DATA)) + self.client.shutdown(socket.SHUT_RDWR) + self.client.close() + self.server.wait() + data = self.server.handler_instance.get_data() + self.assertEqual(len(data), len(self.DATA)) + self.assertEqual(data, self.DATA) + + def test_send_at_certain_offset(self): + # start sending a file at a certain offset + total_sent = 0 + offset = len(self.DATA) // 2 + must_send = len(self.DATA) - offset + nbytes = 4096 + while total_sent < must_send: + sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes) + if sent == 0: + break + offset += sent + total_sent += sent + self.assertTrue(sent <= nbytes) + + self.client.shutdown(socket.SHUT_RDWR) + self.client.close() + self.server.wait() + data = self.server.handler_instance.get_data() + expected = self.DATA[len(self.DATA) // 2:] + self.assertEqual(total_sent, len(expected)) + self.assertEqual(len(data), len(expected)) + self.assertEqual(data, expected) + + def test_offset_overflow(self): + # specify an offset > file size + offset = len(self.DATA) + 4096 + try: + sent = os.sendfile(self.sockno, self.fileno, offset, 4096) + except OSError as e: + # Solaris can raise EINVAL if offset >= file length, ignore. + if e.errno != errno.EINVAL: + raise + else: + self.assertEqual(sent, 0) + self.client.shutdown(socket.SHUT_RDWR) + self.client.close() + self.server.wait() + data = self.server.handler_instance.get_data() + self.assertEqual(data, b'') + + def test_invalid_offset(self): + with self.assertRaises(OSError) as cm: + os.sendfile(self.sockno, self.fileno, -1, 4096) + self.assertEqual(cm.exception.errno, errno.EINVAL) + + # --- headers / trailers tests + + if SUPPORT_HEADERS_TRAILERS: + + def test_headers(self): + total_sent = 0 + sent = os.sendfile(self.sockno, self.fileno, 0, 4096, + headers=[b"x" * 512]) + total_sent += sent + offset = 4096 + nbytes = 4096 + while 1: + sent = self.sendfile_wrapper(self.sockno, self.fileno, + offset, nbytes) + if sent == 0: + break + total_sent += sent + offset += sent + + expected_data = b"x" * 512 + self.DATA + self.assertEqual(total_sent, len(expected_data)) + self.client.close() + self.server.wait() + data = self.server.handler_instance.get_data() + self.assertEqual(hash(data), hash(expected_data)) + + def test_trailers(self): + TESTFN2 = support.TESTFN + "2" + with open(TESTFN2, 'wb') as f: + f.write(b"abcde") + with open(TESTFN2, 'rb')as f: + self.addCleanup(os.remove, TESTFN2) + os.sendfile(self.sockno, f.fileno(), 0, 4096, + trailers=[b"12345"]) + self.client.close() + self.server.wait() + data = self.server.handler_instance.get_data() + self.assertEqual(data, b"abcde12345") + + if hasattr(os, "SF_NODISKIO"): + def test_flags(self): + try: + os.sendfile(self.sockno, self.fileno, 0, 4096, + flags=os.SF_NODISKIO) + except OSError as err: + if err.errno not in (errno.EBUSY, errno.EAGAIN): + raise + + def test_main(): support.run_unittest( FileTests, @@ -1300,6 +1588,8 @@ def test_main(): PidTests, LoginTests, LinkTests, + TestSendfile, + ProgramPriorityTests, ) if __name__ == "__main__": diff --git a/Lib/test/test_osx_env.py b/Lib/test/test_osx_env.py index 8b3df37..24ec2b4 100644 --- a/Lib/test/test_osx_env.py +++ b/Lib/test/test_osx_env.py @@ -5,6 +5,7 @@ Test suite for OS X interpreter environment variables. from test.support import EnvironmentVarGuard, run_unittest import subprocess import sys +import sysconfig import unittest class OSXEnvironmentVariableTestCase(unittest.TestCase): @@ -27,8 +28,6 @@ class OSXEnvironmentVariableTestCase(unittest.TestCase): self._check_sys('PYTHONEXECUTABLE', '==', 'sys.executable') def test_main(): - from distutils import sysconfig - if sys.platform == 'darwin' and sysconfig.get_config_var('WITH_NEXT_FRAMEWORK'): run_unittest(OSXEnvironmentVariableTestCase) diff --git a/Lib/test/test_pdb.py b/Lib/test/test_pdb.py index d861df5..c197aff 100644 --- a/Lib/test/test_pdb.py +++ b/Lib/test/test_pdb.py @@ -20,9 +20,12 @@ class PdbTestInput(object): def __enter__(self): self.real_stdin = sys.stdin sys.stdin = _FakeInput(self.input) + self.orig_trace = sys.gettrace() if hasattr(sys, 'gettrace') else None def __exit__(self, *exc): sys.stdin = self.real_stdin + if self.orig_trace: + sys.settrace(self.orig_trace) def test_pdb_displayhook(): diff --git a/Lib/test/test_peepholer.py b/Lib/test/test_peepholer.py index b7d446f..78de909 100644 --- a/Lib/test/test_peepholer.py +++ b/Lib/test/test_peepholer.py @@ -3,13 +3,16 @@ import re import sys from io import StringIO import unittest +from math import copysign def disassemble(func): f = StringIO() tmp = sys.stdout sys.stdout = f - dis.dis(func) - sys.stdout = tmp + try: + dis.dis(func) + finally: + sys.stdout = tmp result = f.getvalue() f.close() return result @@ -17,6 +20,7 @@ def disassemble(func): def dis_single(line): return disassemble(compile(line, '', 'single')) + class TestTranforms(unittest.TestCase): def test_unot(self): @@ -99,6 +103,12 @@ class TestTranforms(unittest.TestCase): self.assertIn(elem, asm) self.assertNotIn('BUILD_TUPLE', asm) + # Long tuples should be folded too. + asm = dis_single(repr(tuple(range(10000)))) + # One LOAD_CONST for the tuple, one for the None return value + self.assertEqual(asm.count('LOAD_CONST'), 2) + self.assertNotIn('BUILD_TUPLE', asm) + # Bug 1053819: Tuple of constants misidentified when presented with: # . . . opcode_with_arg 100 unary_opcode BUILD_TUPLE 1 . . . # The following would segfault upon compilation @@ -198,6 +208,9 @@ class TestTranforms(unittest.TestCase): def test_folding_of_unaryops_on_constants(self): for line, elem in ( ('-0.5', '(-0.5)'), # unary negative + ('-0.0', '(-0.0)'), # -0.0 + ('-(1.0-1.0)','(-0.0)'), # -0.0 after folding + ('-0', '(0)'), # -0 ('~-2', '(1)'), # unary invert ('+1', '(1)'), # unary positive ): @@ -205,6 +218,13 @@ class TestTranforms(unittest.TestCase): self.assertIn(elem, asm, asm) self.assertNotIn('UNARY_', asm) + # Check that -0.0 works after marshaling + def negzero(): + return -(1.0-1.0) + + self.assertNotIn('UNARY_', disassemble(negzero)) + self.assertTrue(copysign(1.0, negzero()) < 0) + # Verify that unfoldables are skipped for line, elem in ( ('-"abc"', "('abc')"), # unary negative @@ -267,6 +287,25 @@ class TestTranforms(unittest.TestCase): asm = disassemble(f) self.assertNotIn('BINARY_ADD', asm) + def test_constant_folding(self): + # Issue #11244: aggressive constant folding. + exprs = [ + "3 * -5", + "-3 * 5", + "2 * (3 * 4)", + "(2 * 3) * 4", + "(-1, 2, 3)", + "(1, -2, 3)", + "(1, 2, -3)", + "(1, 2, -3) * 6", + "lambda x: x in {(3 * -5) + (-1 - 6), (1, -2, 3) * 2, None}", + ] + for e in exprs: + asm = dis_single(e) + self.assertNotIn('UNARY_', asm, e) + self.assertNotIn('BINARY_', asm, e) + self.assertNotIn('BUILD_', asm, e) + class TestBuglets(unittest.TestCase): def test_bug_11510(self): diff --git a/Lib/test/test_pep292.py b/Lib/test/test_pep292.py index 119c7ea..a1e52e9 100644 --- a/Lib/test/test_pep292.py +++ b/Lib/test/test_pep292.py @@ -49,11 +49,11 @@ class TestTemplate(unittest.TestCase): (?P<invalid>) | (?P<escaped>%(delim)s) | @(?P<named>%(id)s) | - @{(?P<braced>%(id)s)} + @{(?P<braced>%(id)s)} ) """ s = MyPattern('$') - self.assertRaises(ValueError, s.substitute, dict()) + self.assertRaises(ValueError, s.substitute, dict()) def test_percents(self): eq = self.assertEqual diff --git a/Lib/test/test_pep3120.py b/Lib/test/test_pep3120.py index 09fedf0..496f8da 100644 --- a/Lib/test/test_pep3120.py +++ b/Lib/test/test_pep3120.py @@ -19,8 +19,8 @@ class PEP3120Test(unittest.TestCase): try: import test.badsyntax_pep3120 except SyntaxError as msg: - msg = str(msg) - self.assertTrue('UTF-8' in msg or 'utf8' in msg) + msg = str(msg).lower() + self.assertTrue('utf-8' in msg or 'utf8' in msg) else: self.fail("expected exception didn't occur") diff --git a/Lib/test/test_platform.py b/Lib/test/test_platform.py index ec134a1..b59f6e6 100644 --- a/Lib/test/test_platform.py +++ b/Lib/test/test_platform.py @@ -56,13 +56,11 @@ class PlatformTest(unittest.TestCase): def setUp(self): self.save_version = sys.version - self.save_subversion = sys.subversion self.save_mercurial = sys._mercurial self.save_platform = sys.platform def tearDown(self): sys.version = self.save_version - sys.subversion = self.save_subversion sys._mercurial = self.save_mercurial sys.platform = self.save_platform @@ -77,7 +75,7 @@ class PlatformTest(unittest.TestCase): ('IronPython', '1.0.0', '', '', '', '', '.NET 2.0.50727.42')), ): # branch and revision are not "parsed", but fetched - # from sys.subversion. Ignore them + # from sys._mercurial. Ignore them (name, version, branch, revision, buildno, builddate, compiler) \ = platform._sys_version(input) self.assertEqual( @@ -113,8 +111,6 @@ class PlatformTest(unittest.TestCase): if subversion is None: if hasattr(sys, "_mercurial"): del sys._mercurial - if hasattr(sys, "subversion"): - del sys.subversion else: sys._mercurial = subversion if sys_platform is not None: @@ -247,6 +243,34 @@ class PlatformTest(unittest.TestCase): ): self.assertEqual(platform._parse_release_file(input), output) + def test_popen(self): + mswindows = (sys.platform == "win32") + + if mswindows: + command = '"{}" -c "print(\'Hello\')"'.format(sys.executable) + else: + command = "'{}' -c 'print(\"Hello\")'".format(sys.executable) + with platform.popen(command) as stdout: + hello = stdout.read().strip() + stdout.close() + self.assertEqual(hello, "Hello") + + data = 'plop' + if mswindows: + command = '"{}" -c "import sys; data=sys.stdin.read(); exit(len(data))"' + else: + command = "'{}' -c 'import sys; data=sys.stdin.read(); exit(len(data))'" + command = command.format(sys.executable) + with platform.popen(command, 'w') as stdin: + stdout = stdin.write(data) + ret = stdin.close() + self.assertIsNotNone(ret) + if os.name == 'nt': + returncode = ret + else: + returncode = ret >> 8 + self.assertEqual(returncode, len(data)) + def test_main(): support.run_unittest( diff --git a/Lib/test/test_poplib.py b/Lib/test/test_poplib.py index 81af569..0a3adcc 100644 --- a/Lib/test/test_poplib.py +++ b/Lib/test/test_poplib.py @@ -108,6 +108,10 @@ class DummyPOP3Handler(asynchat.async_chat): def cmd_apop(self, arg): self.push('+OK done nothing.') + def cmd_quit(self, arg): + self.push('+OK closing.') + self.close_when_done() + class DummyPOP3Server(asyncore.dispatcher, threading.Thread): @@ -165,10 +169,10 @@ class TestPOP3Class(TestCase): def setUp(self): self.server = DummyPOP3Server((HOST, PORT)) self.server.start() - self.client = poplib.POP3(self.server.host, self.server.port) + self.client = poplib.POP3(self.server.host, self.server.port, timeout=3) def tearDown(self): - self.client.quit() + self.client.close() self.server.stop() def test_getwelcome(self): @@ -228,6 +232,12 @@ class TestPOP3Class(TestCase): self.client.uidl() self.client.uidl('foo') + def test_quit(self): + resp = self.client.quit() + self.assertTrue(resp) + self.assertIsNone(self.client.sock) + self.assertIsNone(self.client.file) + SUPPORTS_SSL = False if hasattr(poplib, 'POP3_SSL'): @@ -274,6 +284,7 @@ if hasattr(poplib, 'POP3_SSL'): else: DummyPOP3Handler.handle_read(self) + class TestPOP3_SSLClass(TestPOP3Class): # repeat previous tests by using poplib.POP3_SSL diff --git a/Lib/test/test_posix.py b/Lib/test/test_posix.py index 45b3afc..0e9ac75 100644 --- a/Lib/test/test_posix.py +++ b/Lib/test/test_posix.py @@ -37,7 +37,7 @@ class PosixTester(unittest.TestCase): NO_ARG_FUNCTIONS = [ "ctermid", "getcwd", "getcwdb", "uname", "times", "getloadavg", "getegid", "geteuid", "getgid", "getgroups", - "getpid", "getpgrp", "getppid", "getuid", + "getpid", "getpgrp", "getppid", "getuid", "sync", ] for name in NO_ARG_FUNCTIONS: @@ -132,6 +132,156 @@ class PosixTester(unittest.TestCase): finally: fp.close() + @unittest.skipUnless(hasattr(posix, 'truncate'), "test needs posix.truncate()") + def test_truncate(self): + with open(support.TESTFN, 'w') as fp: + fp.write('test') + fp.flush() + posix.truncate(support.TESTFN, 0) + + @unittest.skipUnless(hasattr(posix, 'fexecve'), "test needs posix.fexecve()") + @unittest.skipUnless(hasattr(os, 'fork'), "test needs os.fork()") + @unittest.skipUnless(hasattr(os, 'waitpid'), "test needs os.waitpid()") + def test_fexecve(self): + fp = os.open(sys.executable, os.O_RDONLY) + try: + pid = os.fork() + if pid == 0: + os.chdir(os.path.split(sys.executable)[0]) + posix.fexecve(fp, [sys.executable, '-c', 'pass'], os.environ) + else: + self.assertEqual(os.waitpid(pid, 0), (pid, 0)) + finally: + os.close(fp) + + @unittest.skipUnless(hasattr(posix, 'waitid'), "test needs posix.waitid()") + @unittest.skipUnless(hasattr(os, 'fork'), "test needs os.fork()") + def test_waitid(self): + pid = os.fork() + if pid == 0: + os.chdir(os.path.split(sys.executable)[0]) + posix.execve(sys.executable, [sys.executable, '-c', 'pass'], os.environ) + else: + res = posix.waitid(posix.P_PID, pid, posix.WEXITED) + self.assertEqual(pid, res.si_pid) + + @unittest.skipUnless(hasattr(posix, 'lockf'), "test needs posix.lockf()") + def test_lockf(self): + fd = os.open(support.TESTFN, os.O_WRONLY | os.O_CREAT) + try: + os.write(fd, b'test') + os.lseek(fd, 0, os.SEEK_SET) + posix.lockf(fd, posix.F_LOCK, 4) + # section is locked + posix.lockf(fd, posix.F_ULOCK, 4) + finally: + os.close(fd) + + @unittest.skipUnless(hasattr(posix, 'pread'), "test needs posix.pread()") + def test_pread(self): + fd = os.open(support.TESTFN, os.O_RDWR | os.O_CREAT) + try: + os.write(fd, b'test') + os.lseek(fd, 0, os.SEEK_SET) + self.assertEqual(b'es', posix.pread(fd, 2, 1)) + # the first pread() shoudn't disturb the file offset + self.assertEqual(b'te', posix.read(fd, 2)) + finally: + os.close(fd) + + @unittest.skipUnless(hasattr(posix, 'pwrite'), "test needs posix.pwrite()") + def test_pwrite(self): + fd = os.open(support.TESTFN, os.O_RDWR | os.O_CREAT) + try: + os.write(fd, b'test') + os.lseek(fd, 0, os.SEEK_SET) + posix.pwrite(fd, b'xx', 1) + self.assertEqual(b'txxt', posix.read(fd, 4)) + finally: + os.close(fd) + + @unittest.skipUnless(hasattr(posix, 'posix_fallocate'), + "test needs posix.posix_fallocate()") + def test_posix_fallocate(self): + fd = os.open(support.TESTFN, os.O_WRONLY | os.O_CREAT) + try: + posix.posix_fallocate(fd, 0, 10) + except OSError as inst: + # issue10812, ZFS doesn't appear to support posix_fallocate, + # so skip Solaris-based since they are likely to have ZFS. + if inst.errno != errno.EINVAL or not sys.platform.startswith("sunos"): + raise + finally: + os.close(fd) + + @unittest.skipUnless(hasattr(posix, 'posix_fadvise'), + "test needs posix.posix_fadvise()") + def test_posix_fadvise(self): + fd = os.open(support.TESTFN, os.O_RDONLY) + try: + posix.posix_fadvise(fd, 0, 0, posix.POSIX_FADV_WILLNEED) + finally: + os.close(fd) + + @unittest.skipUnless(hasattr(posix, 'futimes'), "test needs posix.futimes()") + def test_futimes(self): + now = time.time() + fd = os.open(support.TESTFN, os.O_RDONLY) + try: + posix.futimes(fd, None) + self.assertRaises(TypeError, posix.futimes, fd, (None, None)) + self.assertRaises(TypeError, posix.futimes, fd, (now, None)) + self.assertRaises(TypeError, posix.futimes, fd, (None, now)) + posix.futimes(fd, (int(now), int(now))) + posix.futimes(fd, (now, now)) + finally: + os.close(fd) + + @unittest.skipUnless(hasattr(posix, 'lutimes'), "test needs posix.lutimes()") + def test_lutimes(self): + now = time.time() + posix.lutimes(support.TESTFN, None) + self.assertRaises(TypeError, posix.lutimes, support.TESTFN, (None, None)) + self.assertRaises(TypeError, posix.lutimes, support.TESTFN, (now, None)) + self.assertRaises(TypeError, posix.lutimes, support.TESTFN, (None, now)) + posix.lutimes(support.TESTFN, (int(now), int(now))) + posix.lutimes(support.TESTFN, (now, now)) + + @unittest.skipUnless(hasattr(posix, 'futimens'), "test needs posix.futimens()") + def test_futimens(self): + now = time.time() + fd = os.open(support.TESTFN, os.O_RDONLY) + try: + self.assertRaises(TypeError, posix.futimens, fd, (None, None), (None, None)) + self.assertRaises(TypeError, posix.futimens, fd, (now, 0), None) + self.assertRaises(TypeError, posix.futimens, fd, None, (now, 0)) + posix.futimens(fd, (int(now), int((now - int(now)) * 1e9)), + (int(now), int((now - int(now)) * 1e9))) + finally: + os.close(fd) + + @unittest.skipUnless(hasattr(posix, 'writev'), "test needs posix.writev()") + def test_writev(self): + fd = os.open(support.TESTFN, os.O_RDWR | os.O_CREAT) + try: + os.writev(fd, (b'test1', b'tt2', b't3')) + os.lseek(fd, 0, os.SEEK_SET) + self.assertEqual(b'test1tt2t3', posix.read(fd, 10)) + finally: + os.close(fd) + + @unittest.skipUnless(hasattr(posix, 'readv'), "test needs posix.readv()") + def test_readv(self): + fd = os.open(support.TESTFN, os.O_RDWR | os.O_CREAT) + try: + os.write(fd, b'test1tt2t3') + os.lseek(fd, 0, os.SEEK_SET) + buf = [bytearray(i) for i in [5, 3, 2]] + self.assertEqual(posix.readv(fd, buf), 10) + self.assertEqual([b'test1', b'tt2', b't3'], [bytes(i) for i in buf]) + finally: + os.close(fd) + def test_dup(self): if hasattr(posix, 'dup'): fp = open(support.TESTFN) @@ -285,6 +435,18 @@ class PosixTester(unittest.TestCase): if hasattr(posix, 'listdir'): self.assertTrue(support.TESTFN in posix.listdir()) + @unittest.skipUnless(hasattr(posix, 'fdlistdir'), "test needs posix.fdlistdir()") + def test_fdlistdir(self): + f = posix.open(posix.getcwd(), posix.O_RDONLY) + self.assertEqual( + sorted(posix.listdir('.')), + sorted(posix.fdlistdir(f)) + ) + # Check the fd was closed by fdlistdir + with self.assertRaises(OSError) as ctx: + posix.close(f) + self.assertEqual(ctx.exception.errno, errno.EBADF) + def test_access(self): if hasattr(posix, 'access'): self.assertTrue(posix.access(support.TESTFN, os.R_OK)) @@ -389,6 +551,198 @@ class PosixTester(unittest.TestCase): set([int(x) for x in groups.split()]), set(posix.getgroups() + [posix.getegid()])) + # tests for the posix *at functions follow + + @unittest.skipUnless(hasattr(posix, 'faccessat'), "test needs posix.faccessat()") + def test_faccessat(self): + f = posix.open(posix.getcwd(), posix.O_RDONLY) + try: + self.assertTrue(posix.faccessat(f, support.TESTFN, os.R_OK)) + finally: + posix.close(f) + + @unittest.skipUnless(hasattr(posix, 'fchmodat'), "test needs posix.fchmodat()") + def test_fchmodat(self): + os.chmod(support.TESTFN, stat.S_IRUSR) + + f = posix.open(posix.getcwd(), posix.O_RDONLY) + try: + posix.fchmodat(f, support.TESTFN, stat.S_IRUSR | stat.S_IWUSR) + + s = posix.stat(support.TESTFN) + self.assertEqual(s[0] & stat.S_IRWXU, stat.S_IRUSR | stat.S_IWUSR) + finally: + posix.close(f) + + @unittest.skipUnless(hasattr(posix, 'fchownat'), "test needs posix.fchownat()") + def test_fchownat(self): + support.unlink(support.TESTFN) + open(support.TESTFN, 'w').close() + + f = posix.open(posix.getcwd(), posix.O_RDONLY) + try: + posix.fchownat(f, support.TESTFN, os.getuid(), os.getgid()) + finally: + posix.close(f) + + @unittest.skipUnless(hasattr(posix, 'fstatat'), "test needs posix.fstatat()") + def test_fstatat(self): + support.unlink(support.TESTFN) + with open(support.TESTFN, 'w') as outfile: + outfile.write("testline\n") + + f = posix.open(posix.getcwd(), posix.O_RDONLY) + try: + s1 = posix.stat(support.TESTFN) + s2 = posix.fstatat(f, support.TESTFN) + self.assertEqual(s1, s2) + finally: + posix.close(f) + + @unittest.skipUnless(hasattr(posix, 'futimesat'), "test needs posix.futimesat()") + def test_futimesat(self): + f = posix.open(posix.getcwd(), posix.O_RDONLY) + try: + now = time.time() + posix.futimesat(f, support.TESTFN, None) + self.assertRaises(TypeError, posix.futimesat, f, support.TESTFN, (None, None)) + self.assertRaises(TypeError, posix.futimesat, f, support.TESTFN, (now, None)) + self.assertRaises(TypeError, posix.futimesat, f, support.TESTFN, (None, now)) + posix.futimesat(f, support.TESTFN, (int(now), int(now))) + posix.futimesat(f, support.TESTFN, (now, now)) + finally: + posix.close(f) + + @unittest.skipUnless(hasattr(posix, 'linkat'), "test needs posix.linkat()") + def test_linkat(self): + f = posix.open(posix.getcwd(), posix.O_RDONLY) + try: + posix.linkat(f, support.TESTFN, f, support.TESTFN + 'link') + # should have same inodes + self.assertEqual(posix.stat(support.TESTFN)[1], + posix.stat(support.TESTFN + 'link')[1]) + finally: + posix.close(f) + support.unlink(support.TESTFN + 'link') + + @unittest.skipUnless(hasattr(posix, 'mkdirat'), "test needs posix.mkdirat()") + def test_mkdirat(self): + f = posix.open(posix.getcwd(), posix.O_RDONLY) + try: + posix.mkdirat(f, support.TESTFN + 'dir') + posix.stat(support.TESTFN + 'dir') # should not raise exception + finally: + posix.close(f) + support.rmtree(support.TESTFN + 'dir') + + @unittest.skipUnless(hasattr(posix, 'mknodat') and hasattr(stat, 'S_IFIFO'), + "don't have mknodat()/S_IFIFO") + def test_mknodat(self): + # Test using mknodat() to create a FIFO (the only use specified + # by POSIX). + support.unlink(support.TESTFN) + mode = stat.S_IFIFO | stat.S_IRUSR | stat.S_IWUSR + f = posix.open(posix.getcwd(), posix.O_RDONLY) + try: + posix.mknodat(f, support.TESTFN, mode, 0) + except OSError as e: + # Some old systems don't allow unprivileged users to use + # mknod(), or only support creating device nodes. + self.assertIn(e.errno, (errno.EPERM, errno.EINVAL)) + else: + self.assertTrue(stat.S_ISFIFO(posix.stat(support.TESTFN).st_mode)) + finally: + posix.close(f) + + @unittest.skipUnless(hasattr(posix, 'openat'), "test needs posix.openat()") + def test_openat(self): + support.unlink(support.TESTFN) + with open(support.TESTFN, 'w') as outfile: + outfile.write("testline\n") + a = posix.open(posix.getcwd(), posix.O_RDONLY) + b = posix.openat(a, support.TESTFN, posix.O_RDONLY) + try: + res = posix.read(b, 9).decode(encoding="utf-8") + self.assertEqual("testline\n", res) + finally: + posix.close(a) + posix.close(b) + + @unittest.skipUnless(hasattr(posix, 'readlinkat'), "test needs posix.readlinkat()") + def test_readlinkat(self): + os.symlink(support.TESTFN, support.TESTFN + 'link') + f = posix.open(posix.getcwd(), posix.O_RDONLY) + try: + self.assertEqual(posix.readlink(support.TESTFN + 'link'), + posix.readlinkat(f, support.TESTFN + 'link')) + finally: + support.unlink(support.TESTFN + 'link') + posix.close(f) + + @unittest.skipUnless(hasattr(posix, 'renameat'), "test needs posix.renameat()") + def test_renameat(self): + support.unlink(support.TESTFN) + open(support.TESTFN + 'ren', 'w').close() + f = posix.open(posix.getcwd(), posix.O_RDONLY) + try: + posix.renameat(f, support.TESTFN + 'ren', f, support.TESTFN) + except: + posix.rename(support.TESTFN + 'ren', support.TESTFN) + raise + else: + posix.stat(support.TESTFN) # should not throw exception + finally: + posix.close(f) + + @unittest.skipUnless(hasattr(posix, 'symlinkat'), "test needs posix.symlinkat()") + def test_symlinkat(self): + f = posix.open(posix.getcwd(), posix.O_RDONLY) + try: + posix.symlinkat(support.TESTFN, f, support.TESTFN + 'link') + self.assertEqual(posix.readlink(support.TESTFN + 'link'), support.TESTFN) + finally: + posix.close(f) + support.unlink(support.TESTFN + 'link') + + @unittest.skipUnless(hasattr(posix, 'unlinkat'), "test needs posix.unlinkat()") + def test_unlinkat(self): + f = posix.open(posix.getcwd(), posix.O_RDONLY) + open(support.TESTFN + 'del', 'w').close() + posix.stat(support.TESTFN + 'del') # should not throw exception + try: + posix.unlinkat(f, support.TESTFN + 'del') + except: + support.unlink(support.TESTFN + 'del') + raise + else: + self.assertRaises(OSError, posix.stat, support.TESTFN + 'link') + finally: + posix.close(f) + + @unittest.skipUnless(hasattr(posix, 'utimensat'), "test needs posix.utimensat()") + def test_utimensat(self): + f = posix.open(posix.getcwd(), posix.O_RDONLY) + try: + now = time.time() + posix.utimensat(f, support.TESTFN, None, None) + self.assertRaises(TypeError, posix.utimensat, f, support.TESTFN, (None, None), (None, None)) + self.assertRaises(TypeError, posix.utimensat, f, support.TESTFN, (now, 0), None) + self.assertRaises(TypeError, posix.utimensat, f, support.TESTFN, None, (now, 0)) + posix.utimensat(f, support.TESTFN, (int(now), int((now - int(now)) * 1e9)), + (int(now), int((now - int(now)) * 1e9))) + finally: + posix.close(f) + + @unittest.skipUnless(hasattr(posix, 'mkfifoat'), "don't have mkfifoat()") + def test_mkfifoat(self): + support.unlink(support.TESTFN) + f = posix.open(posix.getcwd(), posix.O_RDONLY) + try: + posix.mkfifoat(f, support.TESTFN, stat.S_IRUSR | stat.S_IWUSR) + self.assertTrue(stat.S_ISFIFO(posix.stat(support.TESTFN).st_mode)) + finally: + posix.close(f) + class PosixGroupsTester(unittest.TestCase): def setUp(self): @@ -426,7 +780,10 @@ class PosixGroupsTester(unittest.TestCase): def test_main(): - support.run_unittest(PosixTester, PosixGroupsTester) + try: + support.run_unittest(PosixTester, PosixGroupsTester) + finally: + support.reap_children() if __name__ == '__main__': test_main() diff --git a/Lib/test/test_pulldom.py b/Lib/test/test_pulldom.py new file mode 100644 index 0000000..b81a595 --- /dev/null +++ b/Lib/test/test_pulldom.py @@ -0,0 +1,347 @@ +import io +import unittest +import sys +import xml.sax + +from xml.sax.xmlreader import AttributesImpl +from xml.dom import pulldom + +from test.support import run_unittest, findfile + + +tstfile = findfile("test.xml", subdir="xmltestdata") + +# A handy XML snippet, containing attributes, a namespace prefix, and a +# self-closing tag: +SMALL_SAMPLE = """<?xml version="1.0"?> +<html xmlns="http://www.w3.org/1999/xhtml" xmlns:xdc="http://www.xml.com/books"> +<!-- A comment --> +<title>Introduction to XSL</title> +<hr/> +<p><xdc:author xdc:attrib="prefixed attribute" attrib="other attrib">A. Namespace</xdc:author></p> +</html>""" + + +class PullDOMTestCase(unittest.TestCase): + + def test_parse(self): + """Minimal test of DOMEventStream.parse()""" + + # This just tests that parsing from a stream works. Actual parser + # semantics are tested using parseString with a more focused XML + # fragment. + + # Test with a filename: + handler = pulldom.parse(tstfile) + self.addCleanup(handler.stream.close) + list(handler) + + # Test with a file object: + with open(tstfile, "rb") as fin: + list(pulldom.parse(fin)) + + def test_parse_semantics(self): + """Test DOMEventStream parsing semantics.""" + + items = pulldom.parseString(SMALL_SAMPLE) + evt, node = next(items) + # Just check the node is a Document: + self.assertTrue(hasattr(node, "createElement")) + self.assertEqual(pulldom.START_DOCUMENT, evt) + evt, node = next(items) + self.assertEqual(pulldom.START_ELEMENT, evt) + self.assertEqual("html", node.tagName) + self.assertEqual(2, len(node.attributes)) + self.assertEqual(node.attributes.getNamedItem("xmlns:xdc").value, + "http://www.xml.com/books") + evt, node = next(items) + self.assertEqual(pulldom.CHARACTERS, evt) # Line break + evt, node = next(items) + # XXX - A comment should be reported here! + # self.assertEqual(pulldom.COMMENT, evt) + # Line break after swallowed comment: + self.assertEqual(pulldom.CHARACTERS, evt) + evt, node = next(items) + self.assertEqual("title", node.tagName) + title_node = node + evt, node = next(items) + self.assertEqual(pulldom.CHARACTERS, evt) + self.assertEqual("Introduction to XSL", node.data) + evt, node = next(items) + self.assertEqual(pulldom.END_ELEMENT, evt) + self.assertEqual("title", node.tagName) + self.assertTrue(title_node is node) + evt, node = next(items) + self.assertEqual(pulldom.CHARACTERS, evt) + evt, node = next(items) + self.assertEqual(pulldom.START_ELEMENT, evt) + self.assertEqual("hr", node.tagName) + evt, node = next(items) + self.assertEqual(pulldom.END_ELEMENT, evt) + self.assertEqual("hr", node.tagName) + evt, node = next(items) + self.assertEqual(pulldom.CHARACTERS, evt) + evt, node = next(items) + self.assertEqual(pulldom.START_ELEMENT, evt) + self.assertEqual("p", node.tagName) + evt, node = next(items) + self.assertEqual(pulldom.START_ELEMENT, evt) + self.assertEqual("xdc:author", node.tagName) + evt, node = next(items) + self.assertEqual(pulldom.CHARACTERS, evt) + evt, node = next(items) + self.assertEqual(pulldom.END_ELEMENT, evt) + self.assertEqual("xdc:author", node.tagName) + evt, node = next(items) + self.assertEqual(pulldom.END_ELEMENT, evt) + evt, node = next(items) + self.assertEqual(pulldom.CHARACTERS, evt) + evt, node = next(items) + self.assertEqual(pulldom.END_ELEMENT, evt) + # XXX No END_DOCUMENT item is ever obtained: + #evt, node = next(items) + #self.assertEqual(pulldom.END_DOCUMENT, evt) + + def test_expandItem(self): + """Ensure expandItem works as expected.""" + items = pulldom.parseString(SMALL_SAMPLE) + # Loop through the nodes until we get to a "title" start tag: + for evt, item in items: + if evt == pulldom.START_ELEMENT and item.tagName == "title": + items.expandNode(item) + self.assertEqual(1, len(item.childNodes)) + break + else: + self.fail("No \"title\" element detected in SMALL_SAMPLE!") + # Loop until we get to the next start-element: + for evt, node in items: + if evt == pulldom.START_ELEMENT: + break + self.assertEqual("hr", node.tagName, + "expandNode did not leave DOMEventStream in the correct state.") + # Attempt to expand a standalone element: + items.expandNode(node) + self.assertEqual(next(items)[0], pulldom.CHARACTERS) + evt, node = next(items) + self.assertEqual(node.tagName, "p") + items.expandNode(node) + next(items) # Skip character data + evt, node = next(items) + self.assertEqual(node.tagName, "html") + with self.assertRaises(StopIteration): + next(items) + items.clear() + self.assertIsNone(items.parser) + self.assertIsNone(items.stream) + + @unittest.expectedFailure + def test_comment(self): + """PullDOM does not receive "comment" events.""" + items = pulldom.parseString(SMALL_SAMPLE) + for evt, _ in items: + if evt == pulldom.COMMENT: + break + else: + self.fail("No comment was encountered") + + @unittest.expectedFailure + def test_end_document(self): + """PullDOM does not receive "end-document" events.""" + items = pulldom.parseString(SMALL_SAMPLE) + # Read all of the nodes up to and including </html>: + for evt, node in items: + if evt == pulldom.END_ELEMENT and node.tagName == "html": + break + try: + # Assert that the next node is END_DOCUMENT: + evt, node = next(items) + self.assertEqual(pulldom.END_DOCUMENT, evt) + except StopIteration: + self.fail( + "Ran out of events, but should have received END_DOCUMENT") + + +class ThoroughTestCase(unittest.TestCase): + """Test the hard-to-reach parts of pulldom.""" + + def test_thorough_parse(self): + """Test some of the hard-to-reach parts of PullDOM.""" + self._test_thorough(pulldom.parse(None, parser=SAXExerciser())) + + @unittest.expectedFailure + def test_sax2dom_fail(self): + """SAX2DOM can"t handle a PI before the root element.""" + pd = SAX2DOMTestHelper(None, SAXExerciser(), 12) + self._test_thorough(pd) + + def test_thorough_sax2dom(self): + """Test some of the hard-to-reach parts of SAX2DOM.""" + pd = SAX2DOMTestHelper(None, SAX2DOMExerciser(), 12) + self._test_thorough(pd, False) + + def _test_thorough(self, pd, before_root=True): + """Test some of the hard-to-reach parts of the parser, using a mock + parser.""" + + evt, node = next(pd) + self.assertEqual(pulldom.START_DOCUMENT, evt) + # Just check the node is a Document: + self.assertTrue(hasattr(node, "createElement")) + + if before_root: + evt, node = next(pd) + self.assertEqual(pulldom.COMMENT, evt) + self.assertEqual("a comment", node.data) + evt, node = next(pd) + self.assertEqual(pulldom.PROCESSING_INSTRUCTION, evt) + self.assertEqual("target", node.target) + self.assertEqual("data", node.data) + + evt, node = next(pd) + self.assertEqual(pulldom.START_ELEMENT, evt) + self.assertEqual("html", node.tagName) + + evt, node = next(pd) + self.assertEqual(pulldom.COMMENT, evt) + self.assertEqual("a comment", node.data) + evt, node = next(pd) + self.assertEqual(pulldom.PROCESSING_INSTRUCTION, evt) + self.assertEqual("target", node.target) + self.assertEqual("data", node.data) + + evt, node = next(pd) + self.assertEqual(pulldom.START_ELEMENT, evt) + self.assertEqual("p", node.tagName) + + evt, node = next(pd) + self.assertEqual(pulldom.CHARACTERS, evt) + self.assertEqual("text", node.data) + evt, node = next(pd) + self.assertEqual(pulldom.END_ELEMENT, evt) + self.assertEqual("p", node.tagName) + evt, node = next(pd) + self.assertEqual(pulldom.END_ELEMENT, evt) + self.assertEqual("html", node.tagName) + evt, node = next(pd) + self.assertEqual(pulldom.END_DOCUMENT, evt) + + +class SAXExerciser(object): + """A fake sax parser that calls some of the harder-to-reach sax methods to + ensure it emits the correct events""" + + def setContentHandler(self, handler): + self._handler = handler + + def parse(self, _): + h = self._handler + h.startDocument() + + # The next two items ensure that items preceding the first + # start_element are properly stored and emitted: + h.comment("a comment") + h.processingInstruction("target", "data") + + h.startElement("html", AttributesImpl({})) + + h.comment("a comment") + h.processingInstruction("target", "data") + + h.startElement("p", AttributesImpl({"class": "paraclass"})) + h.characters("text") + h.endElement("p") + h.endElement("html") + h.endDocument() + + def stub(self, *args, **kwargs): + """Stub method. Does nothing.""" + pass + setProperty = stub + setFeature = stub + + +class SAX2DOMExerciser(SAXExerciser): + """The same as SAXExerciser, but without the processing instruction and + comment before the root element, because S2D can"t handle it""" + + def parse(self, _): + h = self._handler + h.startDocument() + h.startElement("html", AttributesImpl({})) + h.comment("a comment") + h.processingInstruction("target", "data") + h.startElement("p", AttributesImpl({"class": "paraclass"})) + h.characters("text") + h.endElement("p") + h.endElement("html") + h.endDocument() + + +class SAX2DOMTestHelper(pulldom.DOMEventStream): + """Allows us to drive SAX2DOM from a DOMEventStream.""" + + def reset(self): + self.pulldom = pulldom.SAX2DOM() + # This content handler relies on namespace support + self.parser.setFeature(xml.sax.handler.feature_namespaces, 1) + self.parser.setContentHandler(self.pulldom) + + +class SAX2DOMTestCase(unittest.TestCase): + + def confirm(self, test, testname="Test"): + self.assertTrue(test, testname) + + def test_basic(self): + """Ensure SAX2DOM can parse from a stream.""" + with io.StringIO(SMALL_SAMPLE) as fin: + sd = SAX2DOMTestHelper(fin, xml.sax.make_parser(), + len(SMALL_SAMPLE)) + for evt, node in sd: + if evt == pulldom.START_ELEMENT and node.tagName == "html": + break + # Because the buffer is the same length as the XML, all the + # nodes should have been parsed and added: + self.assertGreater(len(node.childNodes), 0) + + def testSAX2DOM(self): + """Ensure SAX2DOM expands nodes as expected.""" + sax2dom = pulldom.SAX2DOM() + sax2dom.startDocument() + sax2dom.startElement("doc", {}) + sax2dom.characters("text") + sax2dom.startElement("subelm", {}) + sax2dom.characters("text") + sax2dom.endElement("subelm") + sax2dom.characters("text") + sax2dom.endElement("doc") + sax2dom.endDocument() + + doc = sax2dom.document + root = doc.documentElement + (text1, elm1, text2) = root.childNodes + text3 = elm1.childNodes[0] + + self.assertIsNone(text1.previousSibling) + self.assertIs(text1.nextSibling, elm1) + self.assertIs(elm1.previousSibling, text1) + self.assertIs(elm1.nextSibling, text2) + self.assertIs(text2.previousSibling, elm1) + self.assertIsNone(text2.nextSibling) + self.assertIsNone(text3.previousSibling) + self.assertIsNone(text3.nextSibling) + + self.assertIs(root.parentNode, doc) + self.assertIs(text1.parentNode, root) + self.assertIs(elm1.parentNode, root) + self.assertIs(text2.parentNode, root) + self.assertIs(text3.parentNode, elm1) + doc.unlink() + + +def test_main(): + run_unittest(PullDOMTestCase, ThoroughTestCase, SAX2DOMTestCase) + + +if __name__ == "__main__": + test_main() diff --git a/Lib/test/test_pydoc.py b/Lib/test/test_pydoc.py index 1d575cd..0ecf6a5 100644 --- a/Lib/test/test_pydoc.py +++ b/Lib/test/test_pydoc.py @@ -190,7 +190,7 @@ war</tt></dd></dl> missing_pattern = "no Python documentation found for '%s'" # output pattern for module with bad imports -badimport_pattern = "problem in %s - ImportError: No module named %s" +badimport_pattern = "problem in %s - ImportError: No module named %r" def run_pydoc(module_name, *args): """ @@ -248,6 +248,8 @@ class PydocDocTest(unittest.TestCase): @unittest.skipIf(sys.flags.optimize >= 2, "Docstrings are omitted with -O2 and above") + @unittest.skipIf(hasattr(sys, 'gettrace') and sys.gettrace(), + 'trace function introduces __locals__ unexpectedly') def test_html_doc(self): result, doc_loc = get_pydoc_html(pydoc_mod) mod_file = inspect.getabsfile(pydoc_mod) @@ -263,6 +265,8 @@ class PydocDocTest(unittest.TestCase): @unittest.skipIf(sys.flags.optimize >= 2, "Docstrings are omitted with -O2 and above") + @unittest.skipIf(hasattr(sys, 'gettrace') and sys.gettrace(), + 'trace function introduces __locals__ unexpectedly') def test_text_doc(self): result, doc_loc = get_pydoc_text(pydoc_mod) expected_text = expected_text_pattern % \ @@ -340,6 +344,8 @@ class PydocDocTest(unittest.TestCase): @unittest.skipIf(sys.flags.optimize >= 2, 'Docstrings are omitted with -O2 and above') + @unittest.skipIf(hasattr(sys, 'gettrace') and sys.gettrace(), + 'trace function introduces __locals__ unexpectedly') def test_help_output_redirect(self): # issue 940286, if output is set in Helper, then all output from # Helper.help should be redirected diff --git a/Lib/test/test_reprlib.py b/Lib/test/test_reprlib.py index b0dc4d7..e476941 100644 --- a/Lib/test/test_reprlib.py +++ b/Lib/test/test_reprlib.py @@ -234,7 +234,7 @@ class LongReprTest(unittest.TestCase): touch(os.path.join(self.subpkgname, self.pkgname + '.py')) from areallylongpackageandmodulenametotestreprtruncation.areallylongpackageandmodulenametotestreprtruncation import areallylongpackageandmodulenametotestreprtruncation eq(repr(areallylongpackageandmodulenametotestreprtruncation), - "<module '%s' from '%s'>" % (areallylongpackageandmodulenametotestreprtruncation.__name__, areallylongpackageandmodulenametotestreprtruncation.__file__)) + "<module %r from %r>" % (areallylongpackageandmodulenametotestreprtruncation.__name__, areallylongpackageandmodulenametotestreprtruncation.__file__)) eq(repr(sys), "<module 'sys' (built-in)>") def test_type(self): diff --git a/Lib/test/test_richcmp.py b/Lib/test/test_richcmp.py index f8f3717..0b629dc 100644 --- a/Lib/test/test_richcmp.py +++ b/Lib/test/test_richcmp.py @@ -220,6 +220,7 @@ class MiscTest(unittest.TestCase): for func in (do, operator.not_): self.assertRaises(Exc, func, Bad()) + @support.no_tracing def test_recursion(self): # Check that comparison for recursive objects fails gracefully from collections import UserList diff --git a/Lib/test/test_runpy.py b/Lib/test/test_runpy.py index ad3ab39..00f34b1 100644 --- a/Lib/test/test_runpy.py +++ b/Lib/test/test_runpy.py @@ -6,7 +6,8 @@ import sys import re import tempfile import py_compile -from test.support import forget, make_legacy_pyc, run_unittest, unload, verbose +from test.support import ( + forget, make_legacy_pyc, run_unittest, unload, verbose, no_tracing) from test.script_helper import ( make_pkg, make_script, make_zip_pkg, make_zip_script, temp_dir) @@ -395,6 +396,7 @@ argv0 = sys.argv[0] msg = "can't find '__main__' module in %r" % zip_name self._check_import_error(zip_name, msg) + @no_tracing def test_main_recursion_error(self): with temp_dir() as script_dir, temp_dir() as dummy_dir: mod_name = '__main__' diff --git a/Lib/test/test_sax.py b/Lib/test/test_sax.py index 0f6a1ca..bddb375 100644 --- a/Lib/test/test_sax.py +++ b/Lib/test/test_sax.py @@ -20,8 +20,8 @@ import unittest TEST_XMLFILE = findfile("test.xml", subdir="xmltestdata") TEST_XMLFILE_OUT = findfile("test.xml.out", subdir="xmltestdata") try: - TEST_XMLFILE.encode("utf8") - TEST_XMLFILE_OUT.encode("utf8") + TEST_XMLFILE.encode("utf-8") + TEST_XMLFILE_OUT.encode("utf-8") except UnicodeEncodeError: raise unittest.SkipTest("filename is not encodable to utf8") diff --git a/Lib/test/test_scope.py b/Lib/test/test_scope.py index fbc87aa..129a18a 100644 --- a/Lib/test/test_scope.py +++ b/Lib/test/test_scope.py @@ -1,5 +1,5 @@ import unittest -from test.support import check_syntax_error, run_unittest +from test.support import check_syntax_error, cpython_only, run_unittest class ScopeTests(unittest.TestCase): @@ -496,23 +496,22 @@ class ScopeTests(unittest.TestCase): self.assertNotIn("x", varnames) self.assertIn("y", varnames) + @cpython_only def testLocalsClass_WithTrace(self): # Issue23728: after the trace function returns, the locals() # dictionary is used to update all variables, this used to # include free variables. But in class statements, free # variables are not inserted... import sys + self.addCleanup(sys.settrace, sys.gettrace()) sys.settrace(lambda a,b,c:None) - try: - x = 12 + x = 12 - class C: - def f(self): - return x + class C: + def f(self): + return x - self.assertEqual(x, 12) # Used to raise UnboundLocalError - finally: - sys.settrace(None) + self.assertEqual(x, 12) # Used to raise UnboundLocalError def testBoundAndFree(self): # var is bound and free in class @@ -527,6 +526,7 @@ class ScopeTests(unittest.TestCase): inst = f(3)() self.assertEqual(inst.a, inst.m()) + @cpython_only def testInteractionWithTraceFunc(self): import sys @@ -543,6 +543,7 @@ class ScopeTests(unittest.TestCase): class TestClass: pass + self.addCleanup(sys.settrace, sys.gettrace()) sys.settrace(tracer) adaptgetter("foo", TestClass, (1, "")) sys.settrace(None) diff --git a/Lib/test/test_shelve.py b/Lib/test/test_shelve.py index 3e73f52..13c1265 100644 --- a/Lib/test/test_shelve.py +++ b/Lib/test/test_shelve.py @@ -2,7 +2,7 @@ import unittest import shelve import glob from test import support -from collections import MutableMapping +from collections.abc import MutableMapping from test.test_dbm import dbm_iterator def L1(s): @@ -129,8 +129,8 @@ class TestCase(unittest.TestCase): shelve.Shelf(d)[key] = [1] self.assertIn(key.encode('utf-8'), d) # but a different one can be given - shelve.Shelf(d, keyencoding='latin1')[key] = [1] - self.assertIn(key.encode('latin1'), d) + shelve.Shelf(d, keyencoding='latin-1')[key] = [1] + self.assertIn(key.encode('latin-1'), d) # with all consequences s = shelve.Shelf(d, keyencoding='ascii') self.assertRaises(UnicodeEncodeError, s.__setitem__, key, [1]) diff --git a/Lib/test/test_smtplib.py b/Lib/test/test_smtplib.py index 4651f37..d973faa 100644 --- a/Lib/test/test_smtplib.py +++ b/Lib/test/test_smtplib.py @@ -424,6 +424,9 @@ sim_lists = {'list-1':['Mr.A@somewhere.com','Mrs.C@somewhereesle.com'], # Simulated SMTP channel & server class SimSMTPChannel(smtpd.SMTPChannel): + # For testing failures in QUIT when using the context manager API. + quit_response = None + def __init__(self, extra_features, *args, **kw): self._extrafeatures = ''.join( [ "250-{0}\r\n".format(x) for x in extra_features ]) @@ -475,19 +478,31 @@ class SimSMTPChannel(smtpd.SMTPChannel): else: self.push('550 No access for you!') + def smtp_QUIT(self, arg): + # args is ignored + if self.quit_response is None: + super(SimSMTPChannel, self).smtp_QUIT(arg) + else: + self.push(self.quit_response) + self.close_when_done() + def handle_error(self): raise class SimSMTPServer(smtpd.SMTPServer): + # For testing failures in QUIT when using the context manager API. + quit_response = None + def __init__(self, *args, **kw): self._extra_features = [] smtpd.SMTPServer.__init__(self, *args, **kw) def handle_accepted(self, conn, addr): - self._SMTPchannel = SimSMTPChannel(self._extra_features, - self, conn, addr) + self._SMTPchannel = SimSMTPChannel( + self._extra_features, self, conn, addr) + self._SMTPchannel.quit_response = self.quit_response def process_message(self, peer, mailfrom, rcpttos, data): pass @@ -620,6 +635,25 @@ class SMTPSimTests(unittest.TestCase): self.assertIn(sim_auth_credentials['cram-md5'], str(err)) smtp.close() + def test_with_statement(self): + with smtplib.SMTP(HOST, self.port) as smtp: + code, message = smtp.noop() + self.assertEqual(code, 250) + self.assertRaises(smtplib.SMTPServerDisconnected, smtp.send, b'foo') + with smtplib.SMTP(HOST, self.port) as smtp: + smtp.close() + self.assertRaises(smtplib.SMTPServerDisconnected, smtp.send, b'foo') + + def test_with_statement_QUIT_failure(self): + self.serv.quit_response = '421 QUIT FAILED' + with self.assertRaises(smtplib.SMTPResponseException) as error: + with smtplib.SMTP(HOST, self.port) as smtp: + smtp.noop() + self.assertEqual(error.exception.smtp_code, 421) + self.assertEqual(error.exception.smtp_error, b'QUIT FAILED') + # We don't need to clean up self.serv.quit_response because a new + # server is always instantiated in the setUp(). + #TODO: add tests for correct AUTH method fallback now that the #test infrastructure can support it. diff --git a/Lib/test/test_socket.py b/Lib/test/test_socket.py index 9ba391e..8b23ae9 100644 --- a/Lib/test/test_socket.py +++ b/Lib/test/test_socket.py @@ -18,6 +18,7 @@ import contextlib from weakref import proxy import signal import math +import pickle try: import fcntl except ImportError: @@ -44,7 +45,7 @@ def linux_version(): return 0, 0, 0 HOST = support.HOST -MSG = 'Michael Gilfix was here\u1234\r\n'.encode('utf8') ## test unicode string and carriage return +MSG = 'Michael Gilfix was here\u1234\r\n'.encode('utf-8') ## test unicode string and carriage return SUPPORTS_IPV6 = socket.has_ipv6 and try_address('::1', family=socket.AF_INET6) try: @@ -325,6 +326,26 @@ class GeneralModuleTests(unittest.TestCase): if not fqhn in all_host_names: self.fail("Error testing host resolution mechanisms. (fqdn: %s, all: %s)" % (fqhn, repr(all_host_names))) + @unittest.skipUnless(hasattr(socket, 'sethostname'), "test needs socket.sethostname()") + @unittest.skipUnless(hasattr(socket, 'gethostname'), "test needs socket.gethostname()") + def test_sethostname(self): + oldhn = socket.gethostname() + try: + socket.sethostname('new') + except socket.error as e: + if e.errno == errno.EPERM: + self.skipTest("test should be run as root") + else: + raise + try: + # running test as root! + self.assertEqual(socket.gethostname(), 'new') + # Should work with bytes objects too + socket.sethostname(b'bar') + self.assertEqual(socket.gethostname(), 'bar') + finally: + socket.sethostname(oldhn) + def testRefCountGetNameInfo(self): # Testing reference count for getnameinfo if hasattr(sys, "getrefcount"): @@ -744,6 +765,12 @@ class GeneralModuleTests(unittest.TestCase): fp.close() self.assertEqual(repr(fp), "<_io.BufferedReader name=-1>") + def test_pickle(self): + sock = socket.socket() + with sock: + for protocol in range(pickle.HIGHEST_PROTOCOL + 1): + self.assertRaises(TypeError, pickle.dumps, sock, protocol) + @unittest.skipUnless(thread, 'Threading required for this test.') class BasicTCPTest(SocketConnectedTest): @@ -1065,7 +1092,7 @@ class FileObjectClassTestCase(SocketConnectedTest): """ bufsize = -1 # Use default buffer size - encoding = 'utf8' + encoding = 'utf-8' errors = 'strict' newline = None @@ -1286,7 +1313,7 @@ class FileObjectInterruptedTestCase(unittest.TestCase): data = b'' else: data = '' - expecting = expecting.decode('utf8') + expecting = expecting.decode('utf-8') while len(data) != len(expecting): part = fo.read(size) if not part: @@ -1444,7 +1471,7 @@ class UnicodeReadFileObjectClassTestCase(FileObjectClassTestCase): """Tests for socket.makefile() in text mode (rather than binary)""" read_mode = 'r' - read_msg = MSG.decode('utf8') + read_msg = MSG.decode('utf-8') write_mode = 'wb' write_msg = MSG newline = '' @@ -1456,7 +1483,7 @@ class UnicodeWriteFileObjectClassTestCase(FileObjectClassTestCase): read_mode = 'rb' read_msg = MSG write_mode = 'w' - write_msg = MSG.decode('utf8') + write_msg = MSG.decode('utf-8') newline = '' @@ -1464,9 +1491,9 @@ class UnicodeReadWriteFileObjectClassTestCase(FileObjectClassTestCase): """Tests for socket.makefile() in text mode (rather than binary)""" read_mode = 'r' - read_msg = MSG.decode('utf8') + read_msg = MSG.decode('utf-8') write_mode = 'w' - write_msg = MSG.decode('utf8') + write_msg = MSG.decode('utf-8') newline = '' diff --git a/Lib/test/test_strlit.py b/Lib/test/test_strlit.py index 9eb30e9..fb9cdbb 100644 --- a/Lib/test/test_strlit.py +++ b/Lib/test/test_strlit.py @@ -130,7 +130,7 @@ class TestLiterals(unittest.TestCase): self.assertRaises(SyntaxError, self.check_encoding, "utf-8", extra) def test_file_utf8(self): - self.check_encoding("utf8") + self.check_encoding("utf-8") def test_file_iso_8859_1(self): self.check_encoding("iso-8859-1") diff --git a/Lib/test/test_subprocess.py b/Lib/test/test_subprocess.py index 01e670e..6523c8a 100644 --- a/Lib/test/test_subprocess.py +++ b/Lib/test/test_subprocess.py @@ -58,6 +58,8 @@ class BaseTestCase(unittest.TestCase): # shutdown time. That frustrates tests trying to check stderr produced # from a spawned Python process. actual = support.strip_python_stderr(stderr) + # strip_python_stderr also strips whitespace, so we do too. + expected = expected.strip() self.assertEqual(actual, expected, msg) @@ -69,6 +71,15 @@ class ProcessTestCase(BaseTestCase): "import sys; sys.exit(47)"]) self.assertEqual(rc, 47) + def test_call_timeout(self): + # call() function with timeout argument; we want to test that the child + # process gets killed when the timeout expires. If the child isn't + # killed, this call will deadlock since subprocess.call waits for the + # child. + self.assertRaises(subprocess.TimeoutExpired, subprocess.call, + [sys.executable, "-c", "while True: pass"], + timeout=0.1) + def test_check_call_zero(self): # check_call() function with zero return code rc = subprocess.check_call([sys.executable, "-c", @@ -111,6 +122,20 @@ class ProcessTestCase(BaseTestCase): self.fail("Expected ValueError when stdout arg supplied.") self.assertIn('stdout', c.exception.args[0]) + def test_check_output_timeout(self): + # check_output() function with timeout arg + with self.assertRaises(subprocess.TimeoutExpired) as c: + output = subprocess.check_output( + [sys.executable, "-c", + "import sys; sys.stdout.write('BDFL')\n" + "sys.stdout.flush()\n" + "while True: pass"], + # Some heavily loaded buildbots (sparc Debian 3.x) require + # this much time to start and print. + timeout=3) + self.fail("Expected TimeoutExpired.") + self.assertEqual(c.exception.output, b'BDFL') + def test_call_kwargs(self): # call() function with keyword args newenv = os.environ.copy() @@ -300,6 +325,31 @@ class ProcessTestCase(BaseTestCase): rc = subprocess.call([sys.executable, "-c", cmd], stdout=1) self.assertEqual(rc, 2) + def test_stdout_devnull(self): + p = subprocess.Popen([sys.executable, "-c", + 'for i in range(10240):' + 'print("x" * 1024)'], + stdout=subprocess.DEVNULL) + p.wait() + self.assertEqual(p.stdout, None) + + def test_stderr_devnull(self): + p = subprocess.Popen([sys.executable, "-c", + 'import sys\n' + 'for i in range(10240):' + 'sys.stderr.write("x" * 1024)'], + stderr=subprocess.DEVNULL) + p.wait() + self.assertEqual(p.stderr, None) + + def test_stdin_devnull(self): + p = subprocess.Popen([sys.executable, "-c", + 'import sys;' + 'sys.stdin.read(1)'], + stdin=subprocess.DEVNULL) + p.wait() + self.assertEqual(p.stdin, None) + def test_cwd(self): tmpdir = tempfile.gettempdir() # We cannot use os.path.realpath to canonicalize the path, @@ -368,6 +418,41 @@ class ProcessTestCase(BaseTestCase): self.assertEqual(stdout, b"banana") self.assertStderrEqual(stderr, b"pineapple") + def test_communicate_timeout(self): + p = subprocess.Popen([sys.executable, "-c", + 'import sys,os,time;' + 'sys.stderr.write("pineapple\\n");' + 'time.sleep(1);' + 'sys.stderr.write("pear\\n");' + 'sys.stdout.write(sys.stdin.read())'], + universal_newlines=True, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + self.assertRaises(subprocess.TimeoutExpired, p.communicate, "banana", + timeout=0.3) + # Make sure we can keep waiting for it, and that we get the whole output + # after it completes. + (stdout, stderr) = p.communicate() + self.assertEqual(stdout, "banana") + self.assertStderrEqual(stderr.encode(), b"pineapple\npear\n") + + def test_communicate_timeout_large_ouput(self): + # Test a expring timeout while the child is outputting lots of data. + p = subprocess.Popen([sys.executable, "-c", + 'import sys,os,time;' + 'sys.stdout.write("a" * (64 * 1024));' + 'time.sleep(0.2);' + 'sys.stdout.write("a" * (64 * 1024));' + 'time.sleep(0.2);' + 'sys.stdout.write("a" * (64 * 1024));' + 'time.sleep(0.2);' + 'sys.stdout.write("a" * (64 * 1024));'], + stdout=subprocess.PIPE) + self.assertRaises(subprocess.TimeoutExpired, p.communicate, timeout=0.4) + (stdout, _) = p.communicate() + self.assertEqual(len(stdout), 4 * 64 * 1024) + # Test for the fd leak reported in http://bugs.python.org/issue2791. def test_communicate_pipe_fd_leak(self): for stdin_pipe in (False, True): @@ -564,6 +649,15 @@ class ProcessTestCase(BaseTestCase): # Subsequent invocations should just return the returncode self.assertEqual(p.wait(), 0) + def test_wait_timeout(self): + p = subprocess.Popen([sys.executable, + "-c", "import time; time.sleep(0.1)"]) + with self.assertRaises(subprocess.TimeoutExpired) as c: + p.wait(timeout=0.01) + self.assertIn("0.01", str(c.exception)) # For coverage of __str__. + # Some heavily loaded buildbots (sparc Debian 3.x) require this much + # time to start. + self.assertEqual(p.wait(timeout=3), 0) def test_invalid_bufsize(self): # an invalid type of the bufsize argument should raise @@ -1064,6 +1158,11 @@ class POSIXProcessTestCase(BaseTestCase): exitcode = subprocess.call([abs_program, "-c", "pass"]) self.assertEqual(exitcode, 0) + # absolute bytes path as a string + cmd = b"'" + abs_program + b"' -c pass" + exitcode = subprocess.call(cmd, shell=True) + self.assertEqual(exitcode, 0) + # bytes program, unicode PATH env = os.environ.copy() env["PATH"] = path @@ -1214,7 +1313,7 @@ class POSIXProcessTestCase(BaseTestCase): stdout, stderr = p.communicate() self.assertEqual(0, p.returncode, "sigchild_ignore.py exited" " non-zero with this error:\n%s" % - stderr.decode('utf8')) + stderr.decode('utf-8')) def test_select_unbuffered(self): # Issue #11459: bufsize=0 should really set the pipes as diff --git a/Lib/test/test_sys.py b/Lib/test/test_sys.py index db27cdf..d412e67 100644 --- a/Lib/test/test_sys.py +++ b/Lib/test/test_sys.py @@ -303,6 +303,7 @@ class SysModuleTest(unittest.TestCase): self.assertEqual(sys.getdlopenflags(), oldflags+1) sys.setdlopenflags(oldflags) + @test.support.refcount_test def test_refcount(self): # n here must be a global in order for this test to pass while # tracing with a python function. Tracing calls PyFrame_FastToLocals diff --git a/Lib/test/test_sys_settrace.py b/Lib/test/test_sys_settrace.py index 43134e9..acb8e29 100644 --- a/Lib/test/test_sys_settrace.py +++ b/Lib/test/test_sys_settrace.py @@ -251,6 +251,7 @@ class TraceTestCase(unittest.TestCase): def setUp(self): self.using_gc = gc.isenabled() gc.disable() + self.addCleanup(sys.settrace, sys.gettrace()) def tearDown(self): if self.using_gc: @@ -389,6 +390,9 @@ class TraceTestCase(unittest.TestCase): class RaisingTraceFuncTestCase(unittest.TestCase): + def setUp(self): + self.addCleanup(sys.settrace, sys.gettrace()) + def trace(self, frame, event, arg): """A trace function that raises an exception in response to a specific trace event.""" @@ -688,6 +692,10 @@ def no_jump_without_trace_function(): class JumpTestCase(unittest.TestCase): + def setUp(self): + self.addCleanup(sys.settrace, sys.gettrace()) + sys.settrace(None) + def compare_jump_output(self, expected, received): if received != expected: self.fail( "Outputs don't match:\n" + @@ -739,6 +747,8 @@ class JumpTestCase(unittest.TestCase): def test_18_no_jump_to_non_integers(self): self.run_test(no_jump_to_non_integers) def test_19_no_jump_without_trace_function(self): + # Must set sys.settrace(None) in setUp(), else condition is not + # triggered. no_jump_without_trace_function() def test_20_large_function(self): diff --git a/Lib/test/test_tarfile.py b/Lib/test/test_tarfile.py index 68e094d..a645bf2 100644 --- a/Lib/test/test_tarfile.py +++ b/Lib/test/test_tarfile.py @@ -1289,7 +1289,7 @@ class UstarUnicodeTest(unittest.TestCase): self._test_unicode_filename("utf7") def test_utf8_filename(self): - self._test_unicode_filename("utf8") + self._test_unicode_filename("utf-8") def _test_unicode_filename(self, encoding): tar = tarfile.open(tmpname, "w", format=self.format, encoding=encoding, errors="strict") @@ -1368,7 +1368,7 @@ class GNUUnicodeTest(UstarUnicodeTest): def test_bad_pax_header(self): # Test for issue #8633. GNU tar <= 1.23 creates raw binary fields # without a hdrcharset=BINARY header. - for encoding, name in (("utf8", "pax/bad-pax-\udce4\udcf6\udcfc"), + for encoding, name in (("utf-8", "pax/bad-pax-\udce4\udcf6\udcfc"), ("iso8859-1", "pax/bad-pax-\xe4\xf6\xfc"),): with tarfile.open(tarname, encoding=encoding, errors="surrogateescape") as tar: try: @@ -1383,7 +1383,7 @@ class PAXUnicodeTest(UstarUnicodeTest): def test_binary_header(self): # Test a POSIX.1-2008 compatible header with a hdrcharset=BINARY field. - for encoding, name in (("utf8", "pax/hdrcharset-\udce4\udcf6\udcfc"), + for encoding, name in (("utf-8", "pax/hdrcharset-\udce4\udcf6\udcfc"), ("iso8859-1", "pax/hdrcharset-\xe4\xf6\xfc"),): with tarfile.open(tarname, encoding=encoding, errors="surrogateescape") as tar: try: diff --git a/Lib/test/test_threading.py b/Lib/test/test_threading.py index 5f99b2e..c107652 100644 --- a/Lib/test/test_threading.py +++ b/Lib/test/test_threading.py @@ -427,6 +427,14 @@ class ThreadTests(BaseTestCase): t.daemon = True self.assertTrue('daemon' in repr(t)) + def test_deamon_param(self): + t = threading.Thread() + self.assertFalse(t.daemon) + t = threading.Thread(daemon=False) + self.assertFalse(t.daemon) + t = threading.Thread(daemon=True) + self.assertTrue(t.daemon) + class ThreadJoinOnShutdown(BaseTestCase): @@ -677,6 +685,10 @@ class ThreadingExceptionTests(BaseTestCase): thread.start() self.assertRaises(RuntimeError, setattr, thread, "daemon", True) + def test_releasing_unacquired_lock(self): + lock = threading.Lock() + self.assertRaises(RuntimeError, lock.release) + class LockTests(lock_tests.LockTests): locktype = staticmethod(threading.Lock) diff --git a/Lib/test/test_trace.py b/Lib/test/test_trace.py index 461d1d8..d9bef38 100644 --- a/Lib/test/test_trace.py +++ b/Lib/test/test_trace.py @@ -102,6 +102,7 @@ class TracedClass(object): class TestLineCounts(unittest.TestCase): """White-box testing of line-counting, via runfunc""" def setUp(self): + self.addCleanup(sys.settrace, sys.gettrace()) self.tracer = Trace(count=1, trace=0, countfuncs=0, countcallers=0) self.my_py_filename = fix_ext_py(__file__) @@ -192,6 +193,7 @@ class TestRunExecCounts(unittest.TestCase): """A simple sanity test of line-counting, via runctx (exec)""" def setUp(self): self.my_py_filename = fix_ext_py(__file__) + self.addCleanup(sys.settrace, sys.gettrace()) def test_exec_counts(self): self.tracer = Trace(count=1, trace=0, countfuncs=0, countcallers=0) @@ -218,6 +220,7 @@ class TestRunExecCounts(unittest.TestCase): class TestFuncs(unittest.TestCase): """White-box testing of funcs tracing""" def setUp(self): + self.addCleanup(sys.settrace, sys.gettrace()) self.tracer = Trace(count=0, trace=0, countfuncs=1) self.filemod = my_file_and_modname() @@ -242,6 +245,8 @@ class TestFuncs(unittest.TestCase): } self.assertEqual(self.tracer.results().calledfuncs, expected) + @unittest.skipIf(hasattr(sys, 'gettrace') and sys.gettrace(), + 'pre-existing trace function throws off measurements') def test_inst_method_calling(self): obj = TracedClass(20) self.tracer.runfunc(obj.inst_method_calling, 1) @@ -257,9 +262,12 @@ class TestFuncs(unittest.TestCase): class TestCallers(unittest.TestCase): """White-box testing of callers tracing""" def setUp(self): + self.addCleanup(sys.settrace, sys.gettrace()) self.tracer = Trace(count=0, trace=0, countcallers=1) self.filemod = my_file_and_modname() + @unittest.skipIf(hasattr(sys, 'gettrace') and sys.gettrace(), + 'pre-existing trace function throws off measurements') def test_loop_caller_importing(self): self.tracer.runfunc(traced_func_importing_caller, 1) @@ -280,6 +288,9 @@ class TestCallers(unittest.TestCase): # Created separately for issue #3821 class TestCoverage(unittest.TestCase): + def setUp(self): + self.addCleanup(sys.settrace, sys.gettrace()) + def tearDown(self): rmtree(TESTFN) unlink(TESTFN) diff --git a/Lib/test/test_unicode.py b/Lib/test/test_unicode.py index 65b26c5..ace3736 100644 --- a/Lib/test/test_unicode.py +++ b/Lib/test/test_unicode.py @@ -614,7 +614,7 @@ class UnicodeTest(string_tests.CommonTest, self.assertEqual('{0!s}'.format(G('data')), 'string is data') msg = 'object.__format__ with a non-empty format string is deprecated' - with support.check_warnings((msg, PendingDeprecationWarning)): + with support.check_warnings((msg, DeprecationWarning)): self.assertEqual('{0:^10}'.format(E('data')), ' E(data) ') self.assertEqual('{0:^10s}'.format(E('data')), ' E(data) ') self.assertEqual('{0:>15s}'.format(G('data')), ' string is data') @@ -1182,11 +1182,14 @@ class UnicodeTest(string_tests.CommonTest, self.assertEqual('hello'.encode('ascii'), b'hello') self.assertEqual('hello'.encode('utf-7'), b'hello') self.assertEqual('hello'.encode('utf-8'), b'hello') - self.assertEqual('hello'.encode('utf8'), b'hello') + self.assertEqual('hello'.encode('utf-8'), b'hello') self.assertEqual('hello'.encode('utf-16-le'), b'h\000e\000l\000l\000o\000') self.assertEqual('hello'.encode('utf-16-be'), b'\000h\000e\000l\000l\000o') self.assertEqual('hello'.encode('latin-1'), b'hello') + # Default encoding is utf-8 + self.assertEqual('\u2603'.encode(), b'\xe2\x98\x83') + # Roundtrip safety for BMP (just the first 1024 chars) for c in range(1024): u = chr(c) @@ -1427,7 +1430,9 @@ class UnicodeTest(string_tests.CommonTest, # Test PyUnicode_FromFormat() def test_from_format(self): support.import_module('ctypes') - from ctypes import pythonapi, py_object, c_int + from ctypes import (pythonapi, py_object, + c_int, c_long, c_longlong, c_ssize_t, + c_uint, c_ulong, c_ulonglong, c_size_t) if sys.maxunicode == 65535: name = "PyUnicodeUCS2_FromFormat" else: @@ -1452,13 +1457,40 @@ class UnicodeTest(string_tests.CommonTest, 'string, got a non-ASCII byte: 0xe9$', PyUnicode_FromFormat, b'unicode\xe9=%s', 'ascii') + # test "%c" self.assertEqual(PyUnicode_FromFormat(b'%c', c_int(0xabcd)), '\uabcd') self.assertEqual(PyUnicode_FromFormat(b'%c', c_int(0x10ffff)), '\U0010ffff') - # other tests + # test "%" + self.assertEqual(PyUnicode_FromFormat(b'%'), '%') + self.assertEqual(PyUnicode_FromFormat(b'%%'), '%') + self.assertEqual(PyUnicode_FromFormat(b'%%s'), '%s') + self.assertEqual(PyUnicode_FromFormat(b'[%%]'), '[%]') + self.assertEqual(PyUnicode_FromFormat(b'%%%s', b'abc'), '%abc') + + # test integer formats (%i, %d, %u) + self.assertEqual(PyUnicode_FromFormat(b'%03i', c_int(10)), '010') + self.assertEqual(PyUnicode_FromFormat(b'%0.4i', c_int(10)), '0010') + self.assertEqual(PyUnicode_FromFormat(b'%i', c_int(-123)), '-123') + self.assertEqual(PyUnicode_FromFormat(b'%li', c_long(-123)), '-123') + self.assertEqual(PyUnicode_FromFormat(b'%lli', c_longlong(-123)), '-123') + self.assertEqual(PyUnicode_FromFormat(b'%zi', c_ssize_t(-123)), '-123') + + self.assertEqual(PyUnicode_FromFormat(b'%d', c_int(-123)), '-123') + self.assertEqual(PyUnicode_FromFormat(b'%ld', c_long(-123)), '-123') + self.assertEqual(PyUnicode_FromFormat(b'%lld', c_longlong(-123)), '-123') + self.assertEqual(PyUnicode_FromFormat(b'%zd', c_ssize_t(-123)), '-123') + + self.assertEqual(PyUnicode_FromFormat(b'%u', c_uint(123)), '123') + self.assertEqual(PyUnicode_FromFormat(b'%lu', c_ulong(123)), '123') + self.assertEqual(PyUnicode_FromFormat(b'%llu', c_ulonglong(123)), '123') + self.assertEqual(PyUnicode_FromFormat(b'%zu', c_size_t(123)), '123') + + # test %A text = PyUnicode_FromFormat(b'%%A:%A', 'abc\xe9\uabcd\U0010ffff') self.assertEqual(text, r"%A:'abc\xe9\uabcd\U0010ffff'") + # test %V text = PyUnicode_FromFormat(b'repr=%V', 'abc', b'xyz') self.assertEqual(text, 'repr=abc') @@ -1472,6 +1504,13 @@ class UnicodeTest(string_tests.CommonTest, text = PyUnicode_FromFormat(b'repr=%V', None, b'abc\xff') self.assertEqual(text, 'repr=abc\ufffd') + # not supported: copy the raw format string. these tests are just here + # to check for crashs and should not be considered as specifications + self.assertEqual(PyUnicode_FromFormat(b'%1%s', b'abc'), '%s') + self.assertEqual(PyUnicode_FromFormat(b'%1abc'), '%1abc') + self.assertEqual(PyUnicode_FromFormat(b'%+i', c_int(10)), '%+i') + self.assertEqual(PyUnicode_FromFormat(b'%.%s', b'abc'), '%.%s') + # Test PyUnicode_AsWideChar() def test_aswidechar(self): from _testcapi import unicode_aswidechar diff --git a/Lib/test/test_urllib.py b/Lib/test/test_urllib.py index e39fa8d..25ebeee 100644 --- a/Lib/test/test_urllib.py +++ b/Lib/test/test_urllib.py @@ -259,7 +259,7 @@ class urlretrieve_FileTests(unittest.TestCase): def constructLocalFileUrl(self, filePath): filePath = os.path.abspath(filePath) try: - filePath.encode("utf8") + filePath.encode("utf-8") except UnicodeEncodeError: raise unittest.SkipTest("filePath is not encodable to utf8") return "file://%s" % urllib.request.pathname2url(filePath) diff --git a/Lib/test/test_urllib2.py b/Lib/test/test_urllib2.py index 69bcfa2..1d0d98c 100644 --- a/Lib/test/test_urllib2.py +++ b/Lib/test/test_urllib2.py @@ -601,7 +601,7 @@ class OpenerDirectorTests(unittest.TestCase): def sanepathname2url(path): try: - path.encode("utf8") + path.encode("utf-8") except UnicodeEncodeError: raise unittest.SkipTest("path is not encodable to utf8") urlpath = urllib.request.pathname2url(path) diff --git a/Lib/test/test_uuid.py b/Lib/test/test_uuid.py index 43fa656..7bc59ed 100644 --- a/Lib/test/test_uuid.py +++ b/Lib/test/test_uuid.py @@ -471,14 +471,14 @@ class TestUUID(TestCase): if pid == 0: os.close(fds[0]) value = uuid.uuid4() - os.write(fds[1], value.hex.encode('latin1')) + os.write(fds[1], value.hex.encode('latin-1')) os._exit(0) else: os.close(fds[1]) parent_value = uuid.uuid4().hex os.waitpid(pid, 0) - child_value = os.read(fds[0], 100).decode('latin1') + child_value = os.read(fds[0], 100).decode('latin-1') self.assertNotEqual(parent_value, child_value) diff --git a/Lib/test/test_xml_etree.py b/Lib/test/test_xml_etree.py index 22fafa9..40c2291 100644 --- a/Lib/test/test_xml_etree.py +++ b/Lib/test/test_xml_etree.py @@ -22,7 +22,7 @@ from xml.etree import ElementTree as ET SIMPLE_XMLFILE = findfile("simple.xml", subdir="xmltestdata") try: - SIMPLE_XMLFILE.encode("utf8") + SIMPLE_XMLFILE.encode("utf-8") except UnicodeEncodeError: raise unittest.SkipTest("filename is not encodable to utf8") SIMPLE_NS_XMLFILE = findfile("simple-ns.xml", subdir="xmltestdata") @@ -1255,8 +1255,8 @@ def processinginstruction(): >>> ET.tostring(ET.PI('test', '<testing&>')) b'<?test <testing&>?>' - >>> ET.tostring(ET.PI('test', '<testing&>\xe3'), 'latin1') - b"<?xml version='1.0' encoding='latin1'?>\\n<?test <testing&>\\xe3?>" + >>> ET.tostring(ET.PI('test', '<testing&>\xe3'), 'latin-1') + b"<?xml version='1.0' encoding='latin-1'?>\\n<?test <testing&>\\xe3?>" """ # diff --git a/Lib/test/test_xmlrpc_net.py b/Lib/test/test_xmlrpc_net.py index 5df79f0..b9853ed 100644 --- a/Lib/test/test_xmlrpc_net.py +++ b/Lib/test/test_xmlrpc_net.py @@ -1,6 +1,6 @@ #!/usr/bin/env python3 -import collections +import collections.abc import errno import socket import sys @@ -48,7 +48,7 @@ class CurrentTimeTest(unittest.TestCase): # Perform a minimal sanity check on the result, just to be sure # the request means what we think it means. - self.assertIsInstance(builders, collections.Sequence) + self.assertIsInstance(builders, collections.abc.Sequence) self.assertTrue([x for x in builders if "3.x" in x], builders) diff --git a/Lib/test/test_zipimport_support.py b/Lib/test/test_zipimport_support.py index 610dfa0..f0e2fbc 100644 --- a/Lib/test/test_zipimport_support.py +++ b/Lib/test/test_zipimport_support.py @@ -163,20 +163,19 @@ class ZipSupportTests(unittest.TestCase): test_zipped_doctest.test_DocTestRunner.verbose_flag, test_zipped_doctest.test_Example, test_zipped_doctest.test_debug, - test_zipped_doctest.test_pdb_set_trace, - test_zipped_doctest.test_pdb_set_trace_nested, test_zipped_doctest.test_testsource, test_zipped_doctest.test_trailing_space_in_test, test_zipped_doctest.test_DocTestSuite, test_zipped_doctest.test_DocTestFinder, ] - # These remaining tests are the ones which need access + # These tests are the ones which need access # to the data files, so we don't run them fail_due_to_missing_data_files = [ test_zipped_doctest.test_DocFileSuite, test_zipped_doctest.test_testfile, test_zipped_doctest.test_unittest_reportflags, ] + for obj in known_good_tests: _run_object_doctest(obj, test_zipped_doctest) finally: diff --git a/Lib/threading.py b/Lib/threading.py index fd93f74..cb09afa 100644 --- a/Lib/threading.py +++ b/Lib/threading.py @@ -622,7 +622,7 @@ class Thread(_Verbose): #XXX __exc_clear = _sys.exc_clear def __init__(self, group=None, target=None, name=None, - args=(), kwargs=None, verbose=None): + args=(), kwargs=None, verbose=None, *, daemon=None): assert group is None, "group argument must be None for now" _Verbose.__init__(self, verbose) if kwargs is None: @@ -631,7 +631,10 @@ class Thread(_Verbose): self._name = str(name or _newname()) self._args = args self._kwargs = kwargs - self._daemonic = self._set_daemon() + if daemon is not None: + self._daemonic = daemon + else: + self._daemonic = current_thread().daemon self._ident = None self._started = Event() self._stopped = False @@ -648,10 +651,6 @@ class Thread(_Verbose): self._block.__init__() self._started._reset_internal_locks() - def _set_daemon(self): - # Overridden in _MainThread and _DummyThread - return current_thread().daemon - def __repr__(self): assert self._initialized, "Thread.__init__() was not called" status = "initial" @@ -948,15 +947,12 @@ class _Timer(Thread): class _MainThread(Thread): def __init__(self): - Thread.__init__(self, name="MainThread") + Thread.__init__(self, name="MainThread", daemon=False) self._started.set() self._set_ident() with _active_limbo_lock: _active[self._ident] = self - def _set_daemon(self): - return False - def _exitfunc(self): self._stop() t = _pickSomeNonDaemonThread() @@ -988,7 +984,7 @@ def _pickSomeNonDaemonThread(): class _DummyThread(Thread): def __init__(self): - Thread.__init__(self, name=_newname("Dummy-%d")) + Thread.__init__(self, name=_newname("Dummy-%d"), daemon=True) # Thread._block consumes an OS-level locking primitive, which # can never be used by a _DummyThread. Since a _DummyThread @@ -1000,9 +996,6 @@ class _DummyThread(Thread): with _active_limbo_lock: _active[self._ident] = self - def _set_daemon(self): - return True - def join(self, timeout=None): assert False, "cannot join a dummy thread" diff --git a/Lib/unittest/case.py b/Lib/unittest/case.py index 65af16b..4e47707 100644 --- a/Lib/unittest/case.py +++ b/Lib/unittest/case.py @@ -469,7 +469,7 @@ class TestCase(object): warnings.warn("TestResult has no addExpectedFailure method, reporting as passes", RuntimeWarning) result.addSuccess(self) - + return result finally: result.stopTest(self) if orig_result is None: @@ -938,77 +938,6 @@ class TestCase(object): standardMsg = self._truncateMessage(standardMsg, diff) self.fail(self._formatMessage(msg, standardMsg)) - def assertDictContainsSubset(self, subset, dictionary, msg=None): - """Checks whether dictionary is a superset of subset.""" - warnings.warn('assertDictContainsSubset is deprecated', - DeprecationWarning) - missing = [] - mismatched = [] - for key, value in subset.items(): - if key not in dictionary: - missing.append(key) - elif value != dictionary[key]: - mismatched.append('%s, expected: %s, actual: %s' % - (safe_repr(key), safe_repr(value), - safe_repr(dictionary[key]))) - - if not (missing or mismatched): - return - - standardMsg = '' - if missing: - standardMsg = 'Missing: %s' % ','.join(safe_repr(m) for m in - missing) - if mismatched: - if standardMsg: - standardMsg += '; ' - standardMsg += 'Mismatched values: %s' % ','.join(mismatched) - - self.fail(self._formatMessage(msg, standardMsg)) - - def assertSameElements(self, expected_seq, actual_seq, msg=None): - """An unordered sequence specific comparison. - - Raises with an error message listing which elements of expected_seq - are missing from actual_seq and vice versa if any. - - Duplicate elements are ignored when comparing *expected_seq* and - *actual_seq*. It is the equivalent of ``assertEqual(set(expected), - set(actual))`` but it works with sequences of unhashable objects as - well. - """ - warnings.warn('assertSameElements is deprecated', - DeprecationWarning) - try: - expected = set(expected_seq) - actual = set(actual_seq) - missing = sorted(expected.difference(actual)) - unexpected = sorted(actual.difference(expected)) - except TypeError: - # Fall back to slower list-compare if any of the objects are - # not hashable. - expected = list(expected_seq) - actual = list(actual_seq) - try: - expected.sort() - actual.sort() - except TypeError: - missing, unexpected = unorderable_list_difference(expected, - actual) - else: - missing, unexpected = sorted_list_difference(expected, actual) - errors = [] - if missing: - errors.append('Expected, but missing:\n %s' % - safe_repr(missing)) - if unexpected: - errors.append('Unexpected, but present:\n %s' % - safe_repr(unexpected)) - if errors: - standardMsg = '\n'.join(errors) - self.fail(self._formatMessage(msg, standardMsg)) - - def assertCountEqual(self, first, second, msg=None): """An unordered sequence comparison asserting that the same elements, regardless of order. If the same element occurs more than once, @@ -1183,13 +1112,11 @@ class TestCase(object): # The fail* methods can be removed in 3.3, the 5 assert* methods will # have to stay around for a few more versions. See #9424. - failUnlessEqual = assertEquals = _deprecate(assertEqual) - failIfEqual = assertNotEquals = _deprecate(assertNotEqual) - failUnlessAlmostEqual = assertAlmostEquals = _deprecate(assertAlmostEqual) - failIfAlmostEqual = assertNotAlmostEquals = _deprecate(assertNotAlmostEqual) - failUnless = assert_ = _deprecate(assertTrue) - failUnlessRaises = _deprecate(assertRaises) - failIf = _deprecate(assertFalse) + assertEquals = _deprecate(assertEqual) + assertNotEquals = _deprecate(assertNotEqual) + assertAlmostEquals = _deprecate(assertAlmostEqual) + assertNotAlmostEquals = _deprecate(assertNotAlmostEqual) + assert_ = _deprecate(assertTrue) assertRaisesRegexp = _deprecate(assertRaisesRegex) assertRegexpMatches = _deprecate(assertRegex) diff --git a/Lib/unittest/test/_test_warnings.py b/Lib/unittest/test/_test_warnings.py index d0be18d..bfabb02 100644 --- a/Lib/unittest/test/_test_warnings.py +++ b/Lib/unittest/test/_test_warnings.py @@ -19,17 +19,12 @@ def warnfun(): warnings.warn('rw', RuntimeWarning) class TestWarnings(unittest.TestCase): - # unittest warnings will be printed at most once per type (max one message - # for the fail* methods, and one for the assert* methods) + # unittest warnings will be printed at most once per type def test_assert(self): self.assertEquals(2+2, 4) self.assertEquals(2*2, 4) self.assertEquals(2**2, 4) - def test_fail(self): - self.failUnless(1) - self.failUnless(True) - def test_other_unittest(self): self.assertAlmostEqual(2+2, 4) self.assertNotAlmostEqual(4+4, 2) diff --git a/Lib/unittest/test/test_assertions.py b/Lib/unittest/test/test_assertions.py index a1d20eb..4a646c3 100644 --- a/Lib/unittest/test/test_assertions.py +++ b/Lib/unittest/test/test_assertions.py @@ -223,15 +223,6 @@ class TestLongMessage(unittest.TestCase): "\+ \{'key': 'value'\}$", "\+ \{'key': 'value'\} : oops$"]) - def testAssertDictContainsSubset(self): - with warnings.catch_warnings(): - warnings.simplefilter("ignore", DeprecationWarning) - - self.assertMessages('assertDictContainsSubset', ({'key': 'value'}, {}), - ["^Missing: 'key'$", "^oops$", - "^Missing: 'key'$", - "^Missing: 'key' : oops$"]) - def testAssertMultiLineEqual(self): self.assertMessages('assertMultiLineEqual', ("", "foo"), [r"\+ foo$", "^oops$", diff --git a/Lib/unittest/test/test_case.py b/Lib/unittest/test/test_case.py index 314aef3..852ac86 100644 --- a/Lib/unittest/test/test_case.py +++ b/Lib/unittest/test/test_case.py @@ -386,27 +386,62 @@ class Test_TestCase(unittest.TestCase, TestEquality, TestHashing): self.assertIsInstance(Foo().id(), str) - # "If result is omitted or None, a temporary result object is created - # and used, but is not made available to the caller. As TestCase owns the + # "If result is omitted or None, a temporary result object is created, + # used, and is made available to the caller. As TestCase owns the # temporary result startTestRun and stopTestRun are called. def test_run__uses_defaultTestResult(self): events = [] + defaultResult = LoggingResult(events) class Foo(unittest.TestCase): def test(self): events.append('test') def defaultTestResult(self): - return LoggingResult(events) + return defaultResult # Make run() find a result object on its own - Foo('test').run() + result = Foo('test').run() + self.assertIs(result, defaultResult) expected = ['startTestRun', 'startTest', 'test', 'addSuccess', 'stopTest', 'stopTestRun'] self.assertEqual(events, expected) + + # "The result object is returned to run's caller" + def test_run__returns_given_result(self): + + class Foo(unittest.TestCase): + def test(self): + pass + + result = unittest.TestResult() + + retval = Foo('test').run(result) + self.assertIs(retval, result) + + + # "The same effect [as method run] may be had by simply calling the + # TestCase instance." + def test_call__invoking_an_instance_delegates_to_run(self): + resultIn = unittest.TestResult() + resultOut = unittest.TestResult() + + class Foo(unittest.TestCase): + def test(self): + pass + + def run(self, result): + self.assertIs(result, resultIn) + return resultOut + + retval = Foo('test')(resultIn) + + self.assertIs(retval, resultOut) + + def testShortDescriptionWithoutDocstring(self): self.assertIsNone(self.shortDescription()) @@ -488,36 +523,6 @@ class Test_TestCase(unittest.TestCase, TestEquality, TestHashing): self.assertRaises(self.failureException, self.assertNotIn, 'cow', animals) - def testAssertDictContainsSubset(self): - with warnings.catch_warnings(): - warnings.simplefilter("ignore", DeprecationWarning) - - self.assertDictContainsSubset({}, {}) - self.assertDictContainsSubset({}, {'a': 1}) - self.assertDictContainsSubset({'a': 1}, {'a': 1}) - self.assertDictContainsSubset({'a': 1}, {'a': 1, 'b': 2}) - self.assertDictContainsSubset({'a': 1, 'b': 2}, {'a': 1, 'b': 2}) - - with self.assertRaises(self.failureException): - self.assertDictContainsSubset({1: "one"}, {}) - - with self.assertRaises(self.failureException): - self.assertDictContainsSubset({'a': 2}, {'a': 1}) - - with self.assertRaises(self.failureException): - self.assertDictContainsSubset({'c': 1}, {'a': 1}) - - with self.assertRaises(self.failureException): - self.assertDictContainsSubset({'a': 1, 'c': 1}, {'a': 1}) - - with self.assertRaises(self.failureException): - self.assertDictContainsSubset({'a': 1, 'c': 1}, {'a': 1}) - - one = ''.join(chr(i) for i in range(255)) - # this used to cause a UnicodeDecodeError constructing the failure msg - with self.assertRaises(self.failureException): - self.assertDictContainsSubset({'foo': one}, {'foo': '\uFFFD'}) - def testAssertEqual(self): equal_pairs = [ ((), ()), @@ -1094,20 +1099,11 @@ test case have to stay around for a few more versions. See #9424. """ old = ( - (self.failIfEqual, (3, 5)), (self.assertNotEquals, (3, 5)), - (self.failUnlessEqual, (3, 3)), (self.assertEquals, (3, 3)), - (self.failUnlessAlmostEqual, (2.0, 2.0)), (self.assertAlmostEquals, (2.0, 2.0)), - (self.failIfAlmostEqual, (3.0, 5.0)), (self.assertNotAlmostEquals, (3.0, 5.0)), - (self.failUnless, (True,)), (self.assert_, (True,)), - (self.failUnlessRaises, (TypeError, lambda _: 3.14 + 'spam')), - (self.failIf, (False,)), - (self.assertSameElements, ([1, 1, 2, 3], [1, 2, 3])), - (self.assertDictContainsSubset, (dict(a=1, b=2), dict(a=1, b=2, c=3))), (self.assertRaisesRegexp, (KeyError, 'foo', lambda: {}['foo'])), (self.assertRegexpMatches, ('bar', 'bar')), ) @@ -1115,19 +1111,6 @@ test case with self.assertWarns(DeprecationWarning): meth(*args) - def testDeprecatedFailMethods(self): - """Test that the deprecated fail* methods get removed in 3.3""" - if sys.version_info[:2] < (3, 3): - return - deprecated_names = [ - 'failIfEqual', 'failUnlessEqual', 'failUnlessAlmostEqual', - 'failIfAlmostEqual', 'failUnless', 'failUnlessRaises', 'failIf', - 'assertSameElements', 'assertDictContainsSubset', - ] - for deprecated_name in deprecated_names: - with self.assertRaises(AttributeError): - getattr(self, deprecated_name) # remove these in 3.3 - def testDeepcopy(self): # Issue: 5660 class TestableTest(unittest.TestCase): diff --git a/Lib/unittest/test/test_loader.py b/Lib/unittest/test/test_loader.py index f7e31a5..d1b9ef5 100644 --- a/Lib/unittest/test/test_loader.py +++ b/Lib/unittest/test/test_loader.py @@ -239,7 +239,7 @@ class Test_TestLoader(unittest.TestCase): try: loader.loadTestsFromName('sdasfasfasdf') except ImportError as e: - self.assertEqual(str(e), "No module named sdasfasfasdf") + self.assertEqual(str(e), "No module named 'sdasfasfasdf'") else: self.fail("TestLoader.loadTestsFromName failed to raise ImportError") @@ -619,7 +619,7 @@ class Test_TestLoader(unittest.TestCase): try: loader.loadTestsFromNames(['sdasfasfasdf']) except ImportError as e: - self.assertEqual(str(e), "No module named sdasfasfasdf") + self.assertEqual(str(e), "No module named 'sdasfasfasdf'") else: self.fail("TestLoader.loadTestsFromNames failed to raise ImportError") diff --git a/Lib/unittest/test/test_runner.py b/Lib/unittest/test/test_runner.py index 8e95410..1bd5183 100644 --- a/Lib/unittest/test/test_runner.py +++ b/Lib/unittest/test/test_runner.py @@ -257,19 +257,17 @@ class Test_TextTestRunner(unittest.TestCase): return [b.splitlines() for b in p.communicate()] opts = dict(stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=os.path.dirname(__file__)) - ae_msg = b'Please use assertEqual instead.' - at_msg = b'Please use assertTrue instead.' # no args -> all the warnings are printed, unittest warnings only once p = subprocess.Popen([sys.executable, '_test_warnings.py'], **opts) out, err = get_parse_out_err(p) self.assertIn(b'OK', err) # check that the total number of warnings in the output is correct - self.assertEqual(len(out), 12) + self.assertEqual(len(out), 11) # check that the numbers of the different kind of warnings is correct for msg in [b'dw', b'iw', b'uw']: self.assertEqual(out.count(msg), 3) - for msg in [ae_msg, at_msg, b'rw']: + for msg in [b'rw']: self.assertEqual(out.count(msg), 1) args_list = ( @@ -294,11 +292,9 @@ class Test_TextTestRunner(unittest.TestCase): **opts) out, err = get_parse_out_err(p) self.assertIn(b'OK', err) - self.assertEqual(len(out), 14) + self.assertEqual(len(out), 13) for msg in [b'dw', b'iw', b'uw', b'rw']: self.assertEqual(out.count(msg), 3) - for msg in [ae_msg, at_msg]: - self.assertEqual(out.count(msg), 1) def testStdErrLookedUpAtInstantiationTime(self): # see issue 10786 diff --git a/Lib/urllib/request.py b/Lib/urllib/request.py index 4d3648d..a501e37 100644 --- a/Lib/urllib/request.py +++ b/Lib/urllib/request.py @@ -1852,7 +1852,7 @@ class URLopener: if encoding == 'base64': import base64 # XXX is this encoding/decoding ok? - data = base64.decodebytes(data.encode('ascii')).decode('latin1') + data = base64.decodebytes(data.encode('ascii')).decode('latin-1') else: data = unquote(data) msg.append('Content-Length: %d' % len(data)) diff --git a/Lib/urllib/response.py b/Lib/urllib/response.py index 9859642..bce2287 100644 --- a/Lib/urllib/response.py +++ b/Lib/urllib/response.py @@ -33,12 +33,15 @@ class addbase(object): id(self), self.fp) def close(self): + if self.fp: + self.fp.close() + self.fp = None self.read = None self.readline = None self.readlines = None self.fileno = None - if self.fp: self.fp.close() - self.fp = None + self.__iter__ = None + self.__next__ = None def __enter__(self): if self.fp is None: |