diff options
Diffstat (limited to 'Lib')
32 files changed, 1892 insertions, 801 deletions
diff --git a/Lib/UserString.py b/Lib/UserString.py index 500cd12..faad148 100755 --- a/Lib/UserString.py +++ b/Lib/UserString.py @@ -185,15 +185,41 @@ class MutableString(UserString): def __hash__(self): raise TypeError, "unhashable type (it is mutable)" def __setitem__(self, index, sub): - if index < 0: - index += len(self.data) - if index < 0 or index >= len(self.data): raise IndexError - self.data = self.data[:index] + sub + self.data[index+1:] + if isinstance(index, slice): + if isinstance(sub, UserString): + sub = sub.data + elif not isinstance(sub, basestring): + sub = str(sub) + start, stop, step = index.indices(len(self.data)) + if step == -1: + start, stop = stop+1, start+1 + sub = sub[::-1] + elif step != 1: + # XXX(twouters): I guess we should be reimplementing + # the extended slice assignment/deletion algorithm here... + raise TypeError, "invalid step in slicing assignment" + start = min(start, stop) + self.data = self.data[:start] + sub + self.data[stop:] + else: + if index < 0: + index += len(self.data) + if index < 0 or index >= len(self.data): raise IndexError + self.data = self.data[:index] + sub + self.data[index+1:] def __delitem__(self, index): - if index < 0: - index += len(self.data) - if index < 0 or index >= len(self.data): raise IndexError - self.data = self.data[:index] + self.data[index+1:] + if isinstance(index, slice): + start, stop, step = index.indices(len(self.data)) + if step == -1: + start, stop = stop+1, start+1 + elif step != 1: + # XXX(twouters): see same block in __setitem__ + raise TypeError, "invalid step in slicing deletion" + start = min(start, stop) + self.data = self.data[:start] + self.data[stop:] + else: + if index < 0: + index += len(self.data) + if index < 0 or index >= len(self.data): raise IndexError + self.data = self.data[:index] + self.data[index+1:] def __setslice__(self, start, end, sub): start = max(start, 0); end = max(end, 0) if isinstance(sub, UserString): diff --git a/Lib/ctypes/__init__.py b/Lib/ctypes/__init__.py index 2cddb11..cff5483 100644 --- a/Lib/ctypes/__init__.py +++ b/Lib/ctypes/__init__.py @@ -336,7 +336,7 @@ class CDLL(object): <obj>['qsort'] -> callable object Calling the functions releases the Python GIL during the call and - reaquires it afterwards. + reacquires it afterwards. """ class _FuncPtr(_CFuncPtr): _flags_ = _FUNCFLAG_CDECL diff --git a/Lib/runpy.py b/Lib/runpy.py index b463f2b..406e081 100755 --- a/Lib/runpy.py +++ b/Lib/runpy.py @@ -21,8 +21,9 @@ __all__ = [ ] -def _run_code(code, run_globals, init_globals, - mod_name, mod_fname, mod_loader): +def _run_code(code, run_globals, init_globals=None, + mod_name=None, mod_fname=None, + mod_loader=None): """Helper for _run_module_code""" if init_globals is not None: run_globals.update(init_globals) @@ -33,21 +34,31 @@ def _run_code(code, run_globals, init_globals, return run_globals def _run_module_code(code, init_globals=None, - mod_name=None, mod_fname=None, - mod_loader=None, alter_sys=False): + mod_name=None, mod_fname=None, + mod_loader=None): """Helper for run_module""" # Set up the top level namespace dictionary - if alter_sys: - # Modify sys.argv[0] and sys.modules[mod_name] - sys.argv[0] = mod_fname - module = imp.new_module(mod_name) - sys.modules[mod_name] = module - mod_globals = module.__dict__ - else: - # Leave the sys module alone - mod_globals = {} - return _run_code(code, mod_globals, init_globals, - mod_name, mod_fname, mod_loader) + temp_module = imp.new_module(mod_name) + mod_globals = temp_module.__dict__ + # Modify sys.argv[0] and sys.module[mod_name] + saved_argv0 = sys.argv[0] + restore_module = mod_name in sys.modules + if restore_module: + saved_module = sys.modules[mod_name] + sys.argv[0] = mod_fname + sys.modules[mod_name] = temp_module + try: + _run_code(code, mod_globals, init_globals, + mod_name, mod_fname, mod_loader) + finally: + sys.argv[0] = saved_argv0 + if restore_module: + sys.modules[mod_name] = saved_module + else: + del sys.modules[mod_name] + # Copy the globals of the temporary module, as they + # may be cleared when the temporary module goes away + return mod_globals.copy() # This helper is needed due to a missing component in the PEP 302 @@ -60,13 +71,8 @@ def _get_filename(loader, mod_name): else: return get_filename(mod_name) - -def run_module(mod_name, init_globals=None, - run_name=None, alter_sys=False): - """Execute a module's code without importing it - - Returns the resulting top level namespace dictionary - """ +# Helper to get the loader, code and filename for a module +def _get_module_details(mod_name): loader = get_loader(mod_name) if loader is None: raise ImportError("No module named %s" % mod_name) @@ -77,10 +83,40 @@ def run_module(mod_name, init_globals=None, if code is None: raise ImportError("No code object available for %s" % mod_name) filename = _get_filename(loader, mod_name) + return loader, code, filename + + +# XXX ncoghlan: Should this be documented and made public? +def _run_module_as_main(mod_name, set_argv0=True): + """Runs the designated module in the __main__ namespace + + These __*__ magic variables will be overwritten: + __file__ + __loader__ + """ + loader, code, fname = _get_module_details(mod_name) + main_globals = sys.modules["__main__"].__dict__ + if set_argv0: + sys.argv[0] = fname + return _run_code(code, main_globals, None, + "__main__", fname, loader) + +def run_module(mod_name, init_globals=None, + run_name=None, alter_sys=False): + """Execute a module's code without importing it + + Returns the resulting top level namespace dictionary + """ + loader, code, fname = _get_module_details(mod_name) if run_name is None: run_name = mod_name - return _run_module_code(code, init_globals, run_name, - filename, loader, alter_sys) + if alter_sys: + return _run_module_code(code, init_globals, run_name, + fname, loader) + else: + # Leave the sys module alone + return _run_code(code, {}, init_globals, + run_name, fname, loader) if __name__ == "__main__": @@ -89,4 +125,4 @@ if __name__ == "__main__": print("No module specified for execution", file=sys.stderr) else: del sys.argv[0] # Make the requested module sys.argv[0] - run_module(sys.argv[0], run_name="__main__", alter_sys=True) + _run_module_as_main(sys.argv[0]) diff --git a/Lib/socket.py b/Lib/socket.py index bffea15..fca44ea 100644 --- a/Lib/socket.py +++ b/Lib/socket.py @@ -65,6 +65,10 @@ __all__ = ["getfqdn"] __all__.extend(os._get_exports_list(_socket)) if _have_ssl: __all__.extend(os._get_exports_list(_ssl)) + def ssl(sock, keyfile=None, certfile=None): + import ssl as realssl + return realssl.sslwrap_simple(sock, keyfile, certfile) + __all__.append("ssl") # WSA error codes if sys.platform.lower().startswith("win"): diff --git a/Lib/ssl.py b/Lib/ssl.py new file mode 100644 index 0000000..99f6257 --- /dev/null +++ b/Lib/ssl.py @@ -0,0 +1,253 @@ +# Wrapper module for _ssl, providing some additional facilities +# implemented in Python. Written by Bill Janssen. + +"""\ +This module provides some more Pythonic support for SSL. + +Object types: + + sslsocket -- subtype of socket.socket which does SSL over the socket + +Exceptions: + + sslerror -- exception raised for I/O errors + +Functions: + + cert_time_to_seconds -- convert time string used for certificate + notBefore and notAfter functions to integer + seconds past the Epoch (the time values + returned from time.time()) + + fetch_server_certificate (HOST, PORT) -- fetch the certificate provided + by the server running on HOST at port PORT. No + validation of the certificate is performed. + +Integer constants: + +SSL_ERROR_ZERO_RETURN +SSL_ERROR_WANT_READ +SSL_ERROR_WANT_WRITE +SSL_ERROR_WANT_X509_LOOKUP +SSL_ERROR_SYSCALL +SSL_ERROR_SSL +SSL_ERROR_WANT_CONNECT + +SSL_ERROR_EOF +SSL_ERROR_INVALID_ERROR_CODE + +The following group define certificate requirements that one side is +allowing/requiring from the other side: + +CERT_NONE - no certificates from the other side are required (or will + be looked at if provided) +CERT_OPTIONAL - certificates are not required, but if provided will be + validated, and if validation fails, the connection will + also fail +CERT_REQUIRED - certificates are required, and will be validated, and + if validation fails, the connection will also fail + +The following constants identify various SSL protocol variants: + +PROTOCOL_SSLv2 +PROTOCOL_SSLv3 +PROTOCOL_SSLv23 +PROTOCOL_TLSv1 +""" + +import os, sys + +import _ssl # if we can't import it, let the error propagate +from socket import socket +from _ssl import sslerror +from _ssl import CERT_NONE, CERT_OPTIONAL, CERT_REQUIRED +from _ssl import PROTOCOL_SSLv2, PROTOCOL_SSLv3, PROTOCOL_SSLv23, PROTOCOL_TLSv1 + +# Root certs: +# +# The "ca_certs" argument to sslsocket() expects a file containing one or more +# certificates that are roots of various certificate signing chains. This file +# contains the certificates in PEM format (RFC ) where each certificate is +# encoded in base64 encoding and surrounded with a header and footer: +# -----BEGIN CERTIFICATE----- +# ... (CA certificate in base64 encoding) ... +# -----END CERTIFICATE----- +# The various certificates in the file are just concatenated together: +# -----BEGIN CERTIFICATE----- +# ... (CA certificate in base64 encoding) ... +# -----END CERTIFICATE----- +# -----BEGIN CERTIFICATE----- +# ... (a second CA certificate in base64 encoding) ... +# -----END CERTIFICATE----- +# +# Some "standard" root certificates are available at +# +# http://www.thawte.com/roots/ (for Thawte roots) +# http://www.verisign.com/support/roots.html (for Verisign) + +class sslsocket (socket): + + def __init__(self, sock, keyfile=None, certfile=None, + server_side=False, cert_reqs=CERT_NONE, + ssl_version=PROTOCOL_SSLv23, ca_certs=None): + socket.__init__(self, _sock=sock._sock) + if certfile and not keyfile: + keyfile = certfile + if server_side: + self._sslobj = _ssl.sslwrap(self._sock, 1, keyfile, certfile, + cert_reqs, ssl_version, ca_certs) + else: + # see if it's connected + try: + socket.getpeername(self) + except: + # no, no connection yet + self._sslobj = None + else: + # yes, create the SSL object + self._sslobj = _ssl.sslwrap(self._sock, 0, keyfile, certfile, + cert_reqs, ssl_version, ca_certs) + self.keyfile = keyfile + self.certfile = certfile + self.cert_reqs = cert_reqs + self.ssl_version = ssl_version + self.ca_certs = ca_certs + + def read(self, len=1024): + return self._sslobj.read(len) + + def write(self, data): + return self._sslobj.write(data) + + def getpeercert(self): + return self._sslobj.peer_certificate() + + def send (self, data, flags=0): + if flags != 0: + raise ValueError( + "non-zero flags not allowed in calls to send() on %s" % + self.__class__) + return self._sslobj.write(data) + + def send_to (self, data, addr, flags=0): + raise ValueError("send_to not allowed on instances of %s" % + self.__class__) + + def sendall (self, data, flags=0): + if flags != 0: + raise ValueError( + "non-zero flags not allowed in calls to sendall() on %s" % + self.__class__) + return self._sslobj.write(data) + + def recv (self, buflen=1024, flags=0): + if flags != 0: + raise ValueError( + "non-zero flags not allowed in calls to sendall() on %s" % + self.__class__) + return self._sslobj.read(data, buflen) + + def recv_from (self, addr, buflen=1024, flags=0): + raise ValueError("recv_from not allowed on instances of %s" % + self.__class__) + + def shutdown(self): + if self._sslobj: + self._sslobj.shutdown() + self._sslobj = None + else: + socket.shutdown(self) + + def close(self): + if self._sslobj: + self.shutdown() + else: + socket.close(self) + + def connect(self, addr): + # Here we assume that the socket is client-side, and not + # connected at the time of the call. We connect it, then wrap it. + if self._sslobj or (self.getsockname()[1] != 0): + raise ValueError("attempt to connect already-connected sslsocket!") + socket.connect(self, addr) + self._sslobj = _ssl.sslwrap(self._sock, 0, self.keyfile, self.certfile, + self.cert_reqs, self.ssl_version, + self.ca_certs) + + def accept(self): + raise ValueError("accept() not supported on an sslsocket") + + +# some utility functions + +def cert_time_to_seconds(cert_time): + import time + return time.mktime(time.strptime(cert_time, "%b %d %H:%M:%S %Y GMT")) + +# a replacement for the old socket.ssl function + +def sslwrap_simple (sock, keyfile=None, certfile=None): + + return _ssl.sslwrap(sock._sock, 0, keyfile, certfile, CERT_NONE, + PROTOCOL_SSLv23, None) + +# fetch the certificate that the server is providing in PEM form + +def fetch_server_certificate (host, port): + + import re, tempfile, os + + def subproc(cmd): + from subprocess import Popen, PIPE, STDOUT + proc = Popen(cmd, stdout=PIPE, stderr=STDOUT, shell=True) + status = proc.wait() + output = proc.stdout.read() + return status, output + + def strip_to_x509_cert(certfile_contents, outfile=None): + m = re.search(r"^([-]+BEGIN CERTIFICATE[-]+[\r]*\n" + r".*[\r]*^[-]+END CERTIFICATE[-]+)$", + certfile_contents, re.MULTILINE | re.DOTALL) + if not m: + return None + else: + tn = tempfile.mktemp() + fp = open(tn, "w") + fp.write(m.group(1) + "\n") + fp.close() + try: + tn2 = (outfile or tempfile.mktemp()) + status, output = subproc(r'openssl x509 -in "%s" -out "%s"' % + (tn, tn2)) + if status != 0: + raise OperationError(status, tsig, output) + fp = open(tn2, 'rb') + data = fp.read() + fp.close() + os.unlink(tn2) + return data + finally: + os.unlink(tn) + + if sys.platform.startswith("win"): + tfile = tempfile.mktemp() + fp = open(tfile, "w") + fp.write("quit\n") + fp.close() + try: + status, output = subproc( + 'openssl s_client -connect "%s:%s" -showcerts < "%s"' % + (host, port, tfile)) + finally: + os.unlink(tfile) + else: + status, output = subproc( + 'openssl s_client -connect "%s:%s" -showcerts < /dev/null' % + (host, port)) + if status != 0: + raise OSError(status) + certtext = strip_to_x509_cert(output) + if not certtext: + raise ValueError("Invalid response received from server at %s:%s" % + (host, port)) + return certtext diff --git a/Lib/tarfile.py b/Lib/tarfile.py index 98b774a..ffe711a 100644 --- a/Lib/tarfile.py +++ b/Lib/tarfile.py @@ -1508,7 +1508,7 @@ class TarFile(object): if hasattr(fileobj, "mode"): self._mode = fileobj.mode self._extfileobj = True - self.name = os.path.abspath(name) + self.name = os.path.abspath(name) if name else None self.fileobj = fileobj # Init attributes. diff --git a/Lib/test/keycert.pem b/Lib/test/keycert.pem new file mode 100644 index 0000000..2f46fcf --- /dev/null +++ b/Lib/test/keycert.pem @@ -0,0 +1,32 @@ +-----BEGIN RSA PRIVATE KEY----- +MIICXwIBAAKBgQC8ddrhm+LutBvjYcQlnH21PPIseJ1JVG2HMmN2CmZk2YukO+9L +opdJhTvbGfEj0DQs1IE8M+kTUyOmuKfVrFMKwtVeCJphrAnhoz7TYOuLBSqt7lVH +fhi/VwovESJlaBOp+WMnfhcduPEYHYx/6cnVapIkZnLt30zu2um+DzA9jQIDAQAB +AoGBAK0FZpaKj6WnJZN0RqhhK+ggtBWwBnc0U/ozgKz2j1s3fsShYeiGtW6CK5nU +D1dZ5wzhbGThI7LiOXDvRucc9n7vUgi0alqPQ/PFodPxAN/eEYkmXQ7W2k7zwsDA +IUK0KUhktQbLu8qF/m8qM86ba9y9/9YkXuQbZ3COl5ahTZrhAkEA301P08RKv3KM +oXnGU2UHTuJ1MAD2hOrPxjD4/wxA/39EWG9bZczbJyggB4RHu0I3NOSFjAm3HQm0 +ANOu5QK9owJBANgOeLfNNcF4pp+UikRFqxk5hULqRAWzVxVrWe85FlPm0VVmHbb/ +loif7mqjU8o1jTd/LM7RD9f2usZyE2psaw8CQQCNLhkpX3KO5kKJmS9N7JMZSc4j +oog58yeYO8BBqKKzpug0LXuQultYv2K4veaIO04iL9VLe5z9S/Q1jaCHBBuXAkEA +z8gjGoi1AOp6PBBLZNsncCvcV/0aC+1se4HxTNo2+duKSDnbq+ljqOM+E7odU+Nq +ewvIWOG//e8fssd0mq3HywJBAJ8l/c8GVmrpFTx8r/nZ2Pyyjt3dH1widooDXYSV +q6Gbf41Llo5sYAtmxdndTLASuHKecacTgZVhy0FryZpLKrU= +-----END RSA PRIVATE KEY----- +-----BEGIN CERTIFICATE----- +MIICpzCCAhCgAwIBAgIJAP+qStv1cIGNMA0GCSqGSIb3DQEBBQUAMIGJMQswCQYD +VQQGEwJVUzERMA8GA1UECBMIRGVsYXdhcmUxEzARBgNVBAcTCldpbG1pbmd0b24x +IzAhBgNVBAoTGlB5dGhvbiBTb2Z0d2FyZSBGb3VuZGF0aW9uMQwwCgYDVQQLEwNT +U0wxHzAdBgNVBAMTFnNvbWVtYWNoaW5lLnB5dGhvbi5vcmcwHhcNMDcwODI3MTY1 +NDUwWhcNMTMwMjE2MTY1NDUwWjCBiTELMAkGA1UEBhMCVVMxETAPBgNVBAgTCERl +bGF3YXJlMRMwEQYDVQQHEwpXaWxtaW5ndG9uMSMwIQYDVQQKExpQeXRob24gU29m +dHdhcmUgRm91bmRhdGlvbjEMMAoGA1UECxMDU1NMMR8wHQYDVQQDExZzb21lbWFj +aGluZS5weXRob24ub3JnMIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQC8ddrh +m+LutBvjYcQlnH21PPIseJ1JVG2HMmN2CmZk2YukO+9LopdJhTvbGfEj0DQs1IE8 +M+kTUyOmuKfVrFMKwtVeCJphrAnhoz7TYOuLBSqt7lVHfhi/VwovESJlaBOp+WMn +fhcduPEYHYx/6cnVapIkZnLt30zu2um+DzA9jQIDAQABoxUwEzARBglghkgBhvhC +AQEEBAMCBkAwDQYJKoZIhvcNAQEFBQADgYEAF4Q5BVqmCOLv1n8je/Jw9K669VXb +08hyGzQhkemEBYQd6fzQ9A/1ZzHkJKb1P6yreOLSEh4KcxYPyrLRC1ll8nr5OlCx +CMhKkTnR6qBsdNV0XtdU2+N25hqW+Ma4ZeqsN/iiJVCGNOZGnvQuvCAGWF8+J/f/ +iHkC6gGdBJhogs4= +-----END CERTIFICATE----- diff --git a/Lib/test/list_tests.py b/Lib/test/list_tests.py index 0d893f2..a8a7e76 100644 --- a/Lib/test/list_tests.py +++ b/Lib/test/list_tests.py @@ -179,8 +179,10 @@ class CommonTest(seq_tests.CommonTest): self.assertEqual(a, self.type2test(range(10))) self.assertRaises(TypeError, a.__setslice__, 0, 1, 5) + self.assertRaises(TypeError, a.__setitem__, slice(0, 1, 5)) self.assertRaises(TypeError, a.__setslice__) + self.assertRaises(TypeError, a.__setitem__) def test_delslice(self): a = self.type2test([0, 1]) diff --git a/Lib/test/output/test_class b/Lib/test/output/test_class deleted file mode 100644 index d0fed75..0000000 --- a/Lib/test/output/test_class +++ /dev/null @@ -1,64 +0,0 @@ -test_class -__init__: () -__add__: (1,) -__radd__: (1,) -__sub__: (1,) -__rsub__: (1,) -__mul__: (1,) -__rmul__: (1,) -__truediv__: (1,) -__rtruediv__: (1,) -__mod__: (1,) -__rmod__: (1,) -__divmod__: (1,) -__rdivmod__: (1,) -__pow__: (1,) -__rpow__: (1,) -__rshift__: (1,) -__rrshift__: (1,) -__lshift__: (1,) -__rlshift__: (1,) -__and__: (1,) -__rand__: (1,) -__or__: (1,) -__ror__: (1,) -__xor__: (1,) -__rxor__: (1,) -__contains__: (1,) -__getitem__: (1,) -__setitem__: (1, 1) -__delitem__: (1,) -__getslice__: (0, 42) -__setslice__: (0, 42, 'The Answer') -__delslice__: (0, 42) -__getitem__: (slice(2, 1024, 10),) -__setitem__: (slice(2, 1024, 10), 'A lot') -__delitem__: (slice(2, 1024, 10),) -__getitem__: ((slice(None, 42, None), Ellipsis, slice(None, 24, None), 24, 100),) -__setitem__: ((slice(None, 42, None), Ellipsis, slice(None, 24, None), 24, 100), 'Strange') -__delitem__: ((slice(None, 42, None), Ellipsis, slice(None, 24, None), 24, 100),) -__getitem__: (slice(None, 42, None),) -__setitem__: (slice(None, 42, None), 'The Answer') -__delitem__: (slice(None, 42, None),) -__neg__: () -__pos__: () -__abs__: () -__int__: () -__int__: () -__float__: () -__index__: () -__hash__: () -__repr__: () -__str__: () -__eq__: (1,) -__lt__: (1,) -__gt__: (1,) -__ne__: (1,) -__eq__: (1,) -__gt__: (1,) -__lt__: (1,) -__ne__: (1,) -__del__: () -__getattr__: ('spam',) -__setattr__: ('eggs', 'spam, spam, spam and ham') -__delattr__: ('cardinal',) diff --git a/Lib/test/output/test_frozen b/Lib/test/output/test_frozen deleted file mode 100644 index 76f17db..0000000 --- a/Lib/test/output/test_frozen +++ /dev/null @@ -1,4 +0,0 @@ -test_frozen -Hello world... -Hello world... -Hello world... diff --git a/Lib/test/output/test_ossaudiodev b/Lib/test/output/test_ossaudiodev deleted file mode 100644 index f0df5d2..0000000 --- a/Lib/test/output/test_ossaudiodev +++ /dev/null @@ -1,2 +0,0 @@ -test_ossaudiodev -playing test sound file (expected running time: 2.93 sec) diff --git a/Lib/test/output/test_signal b/Lib/test/output/test_signal deleted file mode 100644 index aa64689..0000000 --- a/Lib/test/output/test_signal +++ /dev/null @@ -1,2 +0,0 @@ -test_signal -starting pause() loop... diff --git a/Lib/test/output/test_winreg b/Lib/test/output/test_winreg deleted file mode 100644 index f47aa84..0000000 --- a/Lib/test/output/test_winreg +++ /dev/null @@ -1,3 +0,0 @@ -test_winreg -Local registry tests worked -Remote registry calls can be tested using 'test_winreg.py --remote \\machine_name' diff --git a/Lib/test/string_tests.py b/Lib/test/string_tests.py index bafa23b..916a98f 100644 --- a/Lib/test/string_tests.py +++ b/Lib/test/string_tests.py @@ -935,7 +935,6 @@ class MixinStrUnicodeUserStringTest: self.checkequal('abc', 'abc', '__getitem__', slice(0, 1000)) self.checkequal('a', 'abc', '__getitem__', slice(0, 1)) self.checkequal('', 'abc', '__getitem__', slice(0, 0)) - # FIXME What about negative indices? This is handled differently by [] and __getitem__(slice) self.checkraises(TypeError, 'abc', '__getitem__', 'def') @@ -949,10 +948,21 @@ class MixinStrUnicodeUserStringTest: self.checkequal('', 'abc', '__getslice__', 1000, 1000) self.checkequal('', 'abc', '__getslice__', 2000, 1000) self.checkequal('', 'abc', '__getslice__', 2, 1) - # FIXME What about negative indizes? This is handled differently by [] and __getslice__ self.checkraises(TypeError, 'abc', '__getslice__', 'def') + def test_extended_getslice(self): + # Test extended slicing by comparing with list slicing. + s = string.ascii_letters + string.digits + indices = (0, None, 1, 3, 41, -1, -2, -37) + for start in indices: + for stop in indices: + # Skip step 0 (invalid) + for step in indices[1:]: + L = list(s)[start:stop:step] + self.checkequal("".join(L), s, '__getitem__', + slice(start, stop, step)) + def test_mul(self): self.checkequal('', 'abc', '__mul__', -1) self.checkequal('', 'abc', '__mul__', 0) diff --git a/Lib/test/test_array.py b/Lib/test/test_array.py index cf5c2e8..bae496e 100755 --- a/Lib/test/test_array.py +++ b/Lib/test/test_array.py @@ -468,6 +468,18 @@ class BaseTest(unittest.TestCase): array.array(self.typecode) ) + def test_extended_getslice(self): + # Test extended slicing by comparing with list slicing + # (Assumes list conversion works correctly, too) + a = array.array(self.typecode, self.example) + indices = (0, None, 1, 3, 19, 100, -1, -2, -31, -100) + for start in indices: + for stop in indices: + # Everything except the initial 0 (invalid step) + for step in indices[1:]: + self.assertEqual(list(a[start:stop:step]), + list(a)[start:stop:step]) + def test_setslice(self): a = array.array(self.typecode, self.example) a[:1] = a @@ -551,12 +563,34 @@ class BaseTest(unittest.TestCase): a = array.array(self.typecode, self.example) self.assertRaises(TypeError, a.__setslice__, 0, 0, None) + self.assertRaises(TypeError, a.__setitem__, slice(0, 0), None) self.assertRaises(TypeError, a.__setitem__, slice(0, 1), None) b = array.array(self.badtypecode()) self.assertRaises(TypeError, a.__setslice__, 0, 0, b) + self.assertRaises(TypeError, a.__setitem__, slice(0, 0), b) self.assertRaises(TypeError, a.__setitem__, slice(0, 1), b) + def test_extended_set_del_slice(self): + indices = (0, None, 1, 3, 19, 100, -1, -2, -31, -100) + for start in indices: + for stop in indices: + # Everything except the initial 0 (invalid step) + for step in indices[1:]: + a = array.array(self.typecode, self.example) + L = list(a) + # Make sure we have a slice of exactly the right length, + # but with (hopefully) different data. + data = L[start:stop:step] + data.reverse() + L[start:stop:step] = data + a[start:stop:step] = array.array(self.typecode, data) + self.assertEquals(a, array.array(self.typecode, L)) + + del L[start:stop:step] + del a[start:stop:step] + self.assertEquals(a, array.array(self.typecode, L)) + def test_index(self): example = 2*self.example a = array.array(self.typecode, example) diff --git a/Lib/test/test_buffer.py b/Lib/test/test_buffer.py index eb6e9ea..834c05b 100644 --- a/Lib/test/test_buffer.py +++ b/Lib/test/test_buffer.py @@ -37,6 +37,18 @@ class BufferTests(unittest.TestCase): self.failIf(a == b) self.assertRaises(TypeError, lambda: a < b) + def test_extended_getslice(self): + # Test extended slicing by comparing with list slicing. + s = bytes(range(255, -1, -1)) + b = buffer(s) + indices = (0, None, 1, 3, 19, 300, -1, -2, -31, -300) + for start in indices: + for stop in indices: + # Skip step 0 (invalid) + for step in indices[1:]: + self.assertEqual(b[start:stop:step], + s[start:stop:step]) + def test_main(): test_support.run_unittest(BufferTests) diff --git a/Lib/test/test_class.py b/Lib/test/test_class.py index 003b4a5..bde63a8 100644 --- a/Lib/test/test_class.py +++ b/Lib/test/test_class.py @@ -1,6 +1,9 @@ "Test the functionality of Python classes implementing operators." -from test.test_support import TestFailed +import unittest +import sys + +from test import test_support testmeths = [ @@ -53,340 +56,540 @@ testmeths = [ # "str", # "repr", # "int", -# "long", # "float", -# "oct", -# "hex", # These are separate because they can influence the test of other methods. # "getattr", # "setattr", # "delattr", -class AllTests: - def __hash__(self, *args): - print("__hash__:", args) - return hash(id(self)) - - def __str__(self, *args): - print("__str__:", args) - return "AllTests" - - def __repr__(self, *args): - print("__repr__:", args) - return "AllTests" - - def __int__(self, *args): - print("__int__:", args) - return 1 - - def __index__(self, *args): - print("__index__:", args) - return 1 - - def __float__(self, *args): - print("__float__:", args) - return 1.0 - - def __cmp__(self, *args): - print("__cmp__:", args) - return 0 - - def __eq__(self, *args): - print("__eq__:", args) - return True - - def __ne__(self, *args): - print("__ne__:", args) - return False - - def __lt__(self, *args): - print("__lt__:", args) - return False - - def __le__(self, *args): - print("__le__:", args) - return True - - def __gt__(self, *args): - print("__gt__:", args) - return False - - def __ge__(self, *args): - print("__ge__:", args) - return True - - def __del__(self, *args): - print("__del__:", args) +callLst = [] +def trackCall(f): + def track(*args, **kwargs): + callLst.append((f.__name__, args)) + return f(*args, **kwargs) + return track + +statictests = """ +@trackCall +def __hash__(self, *args): + return hash(id(self)) + +@trackCall +def __str__(self, *args): + return "AllTests" + +@trackCall +def __repr__(self, *args): + return "AllTests" + +@trackCall +def __int__(self, *args): + return 1 + +@trackCall +def __index__(self, *args): + return 1 + +@trackCall +def __float__(self, *args): + return 1.0 + +@trackCall +def __cmp__(self, *args): + return 0 + +@trackCall +def __eq__(self, *args): + return True + +@trackCall +def __ne__(self, *args): + return False + +@trackCall +def __lt__(self, *args): + return False + +@trackCall +def __le__(self, *args): + return True + +@trackCall +def __gt__(self, *args): + return False + +@trackCall +def __ge__(self, *args): + return True +""" -# Synthesize AllTests methods from the names in testmeths. +# Synthesize all the other AllTests methods from the names in testmeths. method_template = """\ -def __%(method)s__(self, *args): - print("__%(method)s__:", args) +@trackCall +def __%s__(self, *args): + pass """ d = {} +exec(statictests, globals(), d) for method in testmeths: - exec(method_template % locals(), d) -for k in d: - setattr(AllTests, k, d[k]) -del d, k -del method, method_template - -# this also tests __init__ of course. -testme = AllTests() - -# Binary operations - -testme + 1 -1 + testme - -testme - 1 -1 - testme - -testme * 1 -1 * testme - -testme / 1 -1 / testme - -testme % 1 -1 % testme + exec(method_template % method, globals(), d) +AllTests = type("AllTests", (object,), d) +del d, statictests, method, method_template -divmod(testme,1) -divmod(1, testme) +class ClassTests(unittest.TestCase): + def setUp(self): + callLst[:] = [] -testme ** 1 -1 ** testme + def assertCallStack(self, expected_calls): + actualCallList = callLst[:] # need to copy because the comparison below will add + # additional calls to callLst + if expected_calls != actualCallList: + self.fail("Expected call list:\n %s\ndoes not match actual call list\n %s" % + (expected_calls, actualCallList)) -testme >> 1 -1 >> testme + def testInit(self): + foo = AllTests() + self.assertCallStack([("__init__", (foo,))]) -testme << 1 -1 << testme + def testBinaryOps(self): + testme = AllTests() + # Binary operations -testme & 1 -1 & testme + callLst[:] = [] + testme + 1 + self.assertCallStack([("__add__", (testme, 1))]) -testme | 1 -1 | testme + callLst[:] = [] + 1 + testme + self.assertCallStack([("__radd__", (testme, 1))]) -testme ^ 1 -1 ^ testme + callLst[:] = [] + testme - 1 + self.assertCallStack([("__sub__", (testme, 1))]) + callLst[:] = [] + 1 - testme + self.assertCallStack([("__rsub__", (testme, 1))]) -# List/dict operations - -class Empty: pass - -try: - 1 in Empty() - print('failed, should have raised TypeError') -except TypeError: - pass - -1 in testme - -testme[1] -testme[1] = 1 -del testme[1] - -testme[:42] -testme[:42] = "The Answer" -del testme[:42] + callLst[:] = [] + testme * 1 + self.assertCallStack([("__mul__", (testme, 1))]) -testme[2:1024:10] -testme[2:1024:10] = "A lot" -del testme[2:1024:10] + callLst[:] = [] + 1 * testme + self.assertCallStack([("__rmul__", (testme, 1))]) -testme[:42, ..., :24:, 24, 100] -testme[:42, ..., :24:, 24, 100] = "Strange" -del testme[:42, ..., :24:, 24, 100] + if 1/2 == 0: + callLst[:] = [] + testme / 1 + self.assertCallStack([("__div__", (testme, 1))]) -# Now remove the slice hooks to see if converting normal slices to slice -# object works. + callLst[:] = [] + 1 / testme + self.assertCallStack([("__rdiv__", (testme, 1))]) -del AllTests.__getslice__ -del AllTests.__setslice__ -del AllTests.__delslice__ - -import sys -if sys.platform[:4] != 'java': - testme[:42] - testme[:42] = "The Answer" - del testme[:42] -else: - # This works under Jython, but the actual slice values are - # different. - print("__getitem__: (slice(0, 42, None),)") - print("__setitem__: (slice(0, 42, None), 'The Answer')") - print("__delitem__: (slice(0, 42, None),)") - -# Unary operations - --testme -+testme -abs(testme) -int(testme) -int(testme) -float(testme) -oct(testme) - -# And the rest... - -hash(testme) -repr(testme) -str(testme) - -testme == 1 -testme < 1 -testme > 1 -testme != 1 -1 == testme -1 < testme -1 > testme -1 != testme - -# This test has to be last (duh.) - -del testme -if sys.platform[:4] == 'java': - import java - java.lang.System.gc() - -# Interfering tests - -class ExtraTests: - def __getattr__(self, *args): - print("__getattr__:", args) - return "SomeVal" - - def __setattr__(self, *args): - print("__setattr__:", args) - - def __delattr__(self, *args): - print("__delattr__:", args) - -testme = ExtraTests() -testme.spam -testme.eggs = "spam, spam, spam and ham" -del testme.cardinal - - -# return values of some method are type-checked -class BadTypeClass: - def __int__(self): - return None - __float__ = __int__ - __str__ = __int__ - __repr__ = __int__ - -def check_exc(stmt, exception): - """Raise TestFailed if executing 'stmt' does not raise 'exception' - """ - try: - exec(stmt) - except exception: - pass - else: - raise TestFailed, "%s should raise %s" % (stmt, exception) - -check_exc("int(BadTypeClass())", TypeError) -check_exc("float(BadTypeClass())", TypeError) -check_exc("str(BadTypeClass())", TypeError) -check_exc("repr(BadTypeClass())", TypeError) -check_exc("oct(BadTypeClass())", TypeError) -check_exc("hex(BadTypeClass())", TypeError) - -# Test correct errors from hash() on objects with comparisons but no __hash__ - -class C0: - pass + callLst[:] = [] + testme % 1 + self.assertCallStack([("__mod__", (testme, 1))]) -hash(C0()) # This should work; the next two should raise TypeError + callLst[:] = [] + 1 % testme + self.assertCallStack([("__rmod__", (testme, 1))]) -class C1: - def __cmp__(self, other): return 0 -check_exc("hash(C1())", TypeError) + callLst[:] = [] + divmod(testme,1) + self.assertCallStack([("__divmod__", (testme, 1))]) -class C2: - def __eq__(self, other): return 1 + callLst[:] = [] + divmod(1, testme) + self.assertCallStack([("__rdivmod__", (testme, 1))]) -check_exc("hash(C2())", TypeError) - -# Test for SF bug 532646 - -class A: - pass -A.__call__ = A() -a = A() -try: - a() # This should not segfault -except RuntimeError: - pass -else: - raise TestFailed, "how could this not have overflowed the stack?" - - -# Tests for exceptions raised in instance_getattr2(). - -def booh(self): - raise AttributeError, "booh" - -class A: - a = property(booh) -try: - A().a # Raised AttributeError: A instance has no attribute 'a' -except AttributeError as x: - if str(x) != "booh": - print("attribute error for A().a got masked:", str(x)) - -class E: - __eq__ = property(booh) -E() == E() # In debug mode, caused a C-level assert() to fail - -class I: - __init__ = property(booh) -try: - I() # In debug mode, printed XXX undetected error and raises AttributeError -except AttributeError as x: - pass -else: - print("attribute error for I.__init__ got masked") - - -# Test comparison and hash of methods -class A: - def __init__(self, x): - self.x = x - def f(self): - pass - def g(self): - pass - def __eq__(self, other): - return self.x == other.x - def __hash__(self): - return self.x -class B(A): - pass + callLst[:] = [] + testme ** 1 + self.assertCallStack([("__pow__", (testme, 1))]) -a1 = A(1) -a2 = A(2) -assert a1.f == a1.f -assert a1.f != a2.f -assert a1.f != a1.g -assert a1.f == A(1).f -assert hash(a1.f) == hash(a1.f) -assert hash(a1.f) == hash(A(1).f) - -assert A.f != a1.f -assert A.f != A.g -assert B.f == A.f -assert hash(B.f) == hash(A.f) - -# the following triggers a SystemError in 2.4 -a = A(hash(A.f.im_func)^(-1)) -hash(a.f) + callLst[:] = [] + 1 ** testme + self.assertCallStack([("__rpow__", (testme, 1))]) + + callLst[:] = [] + testme >> 1 + self.assertCallStack([("__rshift__", (testme, 1))]) + + callLst[:] = [] + 1 >> testme + self.assertCallStack([("__rrshift__", (testme, 1))]) + + callLst[:] = [] + testme << 1 + self.assertCallStack([("__lshift__", (testme, 1))]) + + callLst[:] = [] + 1 << testme + self.assertCallStack([("__rlshift__", (testme, 1))]) + + callLst[:] = [] + testme & 1 + self.assertCallStack([("__and__", (testme, 1))]) + + callLst[:] = [] + 1 & testme + self.assertCallStack([("__rand__", (testme, 1))]) + + callLst[:] = [] + testme | 1 + self.assertCallStack([("__or__", (testme, 1))]) + + callLst[:] = [] + 1 | testme + self.assertCallStack([("__ror__", (testme, 1))]) + + callLst[:] = [] + testme ^ 1 + self.assertCallStack([("__xor__", (testme, 1))]) + + callLst[:] = [] + 1 ^ testme + self.assertCallStack([("__rxor__", (testme, 1))]) + + def testListAndDictOps(self): + testme = AllTests() + + # List/dict operations + + class Empty: pass + + try: + 1 in Empty() + self.fail('failed, should have raised TypeError') + except TypeError: + pass + + callLst[:] = [] + 1 in testme + self.assertCallStack([('__contains__', (testme, 1))]) + + callLst[:] = [] + testme[1] + self.assertCallStack([('__getitem__', (testme, 1))]) + + callLst[:] = [] + testme[1] = 1 + self.assertCallStack([('__setitem__', (testme, 1, 1))]) + + callLst[:] = [] + del testme[1] + self.assertCallStack([('__delitem__', (testme, 1))]) + + callLst[:] = [] + testme[:42] + self.assertCallStack([('__getslice__', (testme, 0, 42))]) + + callLst[:] = [] + testme[:42] = "The Answer" + self.assertCallStack([('__setslice__', (testme, 0, 42, "The Answer"))]) + + callLst[:] = [] + del testme[:42] + self.assertCallStack([('__delslice__', (testme, 0, 42))]) + + callLst[:] = [] + testme[2:1024:10] + self.assertCallStack([('__getitem__', (testme, slice(2, 1024, 10)))]) + + callLst[:] = [] + testme[2:1024:10] = "A lot" + self.assertCallStack([('__setitem__', (testme, slice(2, 1024, 10), + "A lot"))]) + callLst[:] = [] + del testme[2:1024:10] + self.assertCallStack([('__delitem__', (testme, slice(2, 1024, 10)))]) + + callLst[:] = [] + testme[:42, ..., :24:, 24, 100] + self.assertCallStack([('__getitem__', (testme, (slice(None, 42, None), + Ellipsis, + slice(None, 24, None), + 24, 100)))]) + callLst[:] = [] + testme[:42, ..., :24:, 24, 100] = "Strange" + self.assertCallStack([('__setitem__', (testme, (slice(None, 42, None), + Ellipsis, + slice(None, 24, None), + 24, 100), "Strange"))]) + callLst[:] = [] + del testme[:42, ..., :24:, 24, 100] + self.assertCallStack([('__delitem__', (testme, (slice(None, 42, None), + Ellipsis, + slice(None, 24, None), + 24, 100)))]) + + # Now remove the slice hooks to see if converting normal slices to + # slice object works. + + getslice = AllTests.__getslice__ + del AllTests.__getslice__ + setslice = AllTests.__setslice__ + del AllTests.__setslice__ + delslice = AllTests.__delslice__ + del AllTests.__delslice__ + + # XXX when using new-style classes the slice testme[:42] produces + # slice(None, 42, None) instead of slice(0, 42, None). py3k will have + # to change this test. + callLst[:] = [] + testme[0:42] + self.assertCallStack([('__getitem__', (testme, slice(0, 42, None)))]) + + callLst[:] = [] + testme[:42] = "The Answer" + self.assertCallStack([('__setitem__', (testme, slice(None, 42, None), + "The Answer"))]) + callLst[:] = [] + del testme[0:42] + self.assertCallStack([('__delitem__', (testme, slice(0, 42, None)))]) + + # Restore the slice methods, or the tests will fail with regrtest -R. + AllTests.__getslice__ = getslice + AllTests.__setslice__ = setslice + AllTests.__delslice__ = delslice + + + def testUnaryOps(self): + testme = AllTests() + + callLst[:] = [] + -testme + self.assertCallStack([('__neg__', (testme,))]) + callLst[:] = [] + +testme + self.assertCallStack([('__pos__', (testme,))]) + callLst[:] = [] + abs(testme) + self.assertCallStack([('__abs__', (testme,))]) + callLst[:] = [] + int(testme) + self.assertCallStack([('__int__', (testme,))]) + callLst[:] = [] + float(testme) + self.assertCallStack([('__float__', (testme,))]) + callLst[:] = [] + oct(testme) + self.assertCallStack([('__index__', (testme,))]) + callLst[:] = [] + hex(testme) + self.assertCallStack([('__index__', (testme,))]) + + + def testMisc(self): + testme = AllTests() + + callLst[:] = [] + hash(testme) + self.assertCallStack([('__hash__', (testme,))]) + + callLst[:] = [] + repr(testme) + self.assertCallStack([('__repr__', (testme,))]) + + callLst[:] = [] + str(testme) + self.assertCallStack([('__str__', (testme,))]) + + callLst[:] = [] + testme == 1 + self.assertCallStack([('__eq__', (testme, 1))]) + + callLst[:] = [] + testme < 1 + self.assertCallStack([('__lt__', (testme, 1))]) + + callLst[:] = [] + testme > 1 + self.assertCallStack([('__gt__', (testme, 1))]) + + callLst[:] = [] + testme != 1 + self.assertCallStack([('__ne__', (testme, 1))]) + + callLst[:] = [] + 1 == testme + self.assertCallStack([('__eq__', (1, testme))]) + + callLst[:] = [] + 1 < testme + self.assertCallStack([('__gt__', (1, testme))]) + + callLst[:] = [] + 1 > testme + self.assertCallStack([('__lt__', (1, testme))]) + + callLst[:] = [] + 1 != testme + self.assertCallStack([('__ne__', (1, testme))]) + + + def testGetSetAndDel(self): + # Interfering tests + class ExtraTests(AllTests): + @trackCall + def __getattr__(self, *args): + return "SomeVal" + + @trackCall + def __setattr__(self, *args): + pass + + @trackCall + def __delattr__(self, *args): + pass + + testme = ExtraTests() + + callLst[:] = [] + testme.spam + self.assertCallStack([('__getattr__', (testme, "spam"))]) + + callLst[:] = [] + testme.eggs = "spam, spam, spam and ham" + self.assertCallStack([('__setattr__', (testme, "eggs", + "spam, spam, spam and ham"))]) + + callLst[:] = [] + del testme.cardinal + self.assertCallStack([('__delattr__', (testme, "cardinal"))]) + + def testDel(self): + x = [] + + class DelTest: + def __del__(self): + x.append("crab people, crab people") + testme = DelTest() + del testme + import gc + gc.collect() + self.assertEquals(["crab people, crab people"], x) + + def testBadTypeReturned(self): + # return values of some method are type-checked + class BadTypeClass: + def __int__(self): + return None + __float__ = __int__ + __str__ = __int__ + __repr__ = __int__ + __oct__ = __int__ + __hex__ = __int__ + + for f in [int, float, str, repr, oct, hex]: + self.assertRaises(TypeError, f, BadTypeClass()) + + def testHashStuff(self): + # Test correct errors from hash() on objects with comparisons but + # no __hash__ + + class C0: + pass + + hash(C0()) # This should work; the next two should raise TypeError + + class C1: + def __cmp__(self, other): return 0 + + self.assertRaises(TypeError, hash, C1()) + + class C2: + def __eq__(self, other): return 1 + + self.assertRaises(TypeError, hash, C2()) + + + def testSFBug532646(self): + # Test for SF bug 532646 + + class A: + pass + A.__call__ = A() + a = A() + + try: + a() # This should not segfault + except RuntimeError: + pass + else: + self.fail("Failed to raise RuntimeError") + + def testForExceptionsRaisedInInstanceGetattr2(self): + # Tests for exceptions raised in instance_getattr2(). + + def booh(self): + raise AttributeError("booh") + + class A: + a = property(booh) + try: + A().a # Raised AttributeError: A instance has no attribute 'a' + except AttributeError as x: + if str(x) != "booh": + self.fail("attribute error for A().a got masked: %s" % x) + + class E: + __eq__ = property(booh) + E() == E() # In debug mode, caused a C-level assert() to fail + + class I: + __init__ = property(booh) + try: + # In debug mode, printed XXX undetected error and + # raises AttributeError + I() + except AttributeError as x: + pass + else: + self.fail("attribute error for I.__init__ got masked") + + def testHashComparisonOfMethods(self): + # Test comparison and hash of methods + class A: + def __init__(self, x): + self.x = x + def f(self): + pass + def g(self): + pass + def __eq__(self, other): + return self.x == other.x + def __hash__(self): + return self.x + class B(A): + pass + + a1 = A(1) + a2 = A(2) + self.assertEquals(a1.f, a1.f) + self.assertNotEquals(a1.f, a2.f) + self.assertNotEquals(a1.f, a1.g) + self.assertEquals(a1.f, A(1).f) + self.assertEquals(hash(a1.f), hash(a1.f)) + self.assertEquals(hash(a1.f), hash(A(1).f)) + + self.assertNotEquals(A.f, a1.f) + self.assertNotEquals(A.f, A.g) + self.assertEquals(B.f, A.f) + self.assertEquals(hash(B.f), hash(A.f)) + + # the following triggers a SystemError in 2.4 + a = A(hash(A.f.im_func)^(-1)) + hash(a.f) + +def test_main(): + test_support.run_unittest(ClassTests) + +if __name__=='__main__': + test_main() diff --git a/Lib/test/test_cmd_line.py b/Lib/test/test_cmd_line.py index cb1bc9ff..441df09 100644 --- a/Lib/test/test_cmd_line.py +++ b/Lib/test/test_cmd_line.py @@ -3,18 +3,25 @@ import test.test_support, unittest import sys import subprocess +def _spawn_python(*args): + cmd_line = [sys.executable] + cmd_line.extend(args) + return subprocess.Popen(cmd_line, stdin=subprocess.PIPE, + stdout=subprocess.PIPE, stderr=subprocess.STDOUT) + +def _kill_python(p): + p.stdin.close() + data = p.stdout.read() + p.stdout.close() + # try to cleanup the child so we don't appear to leak when running + # with regrtest -R. This should be a no-op on Windows. + subprocess._cleanup() + return data + class CmdLineTest(unittest.TestCase): - def start_python(self, cmd_line): - cmd = '"%s" %s' % (sys.executable, cmd_line) - p = subprocess.Popen(cmd, shell=True, stdin=subprocess.PIPE, - stdout=subprocess.PIPE, stderr=subprocess.STDOUT) - p.stdin.close() - data = p.stdout.read() - p.stdout.close() - # try to cleanup the child so we don't appear to leak when running - # with regrtest -R. This should be a no-op on Windows. - subprocess._cleanup() - return data + def start_python(self, *args): + p = _spawn_python(*args) + return _kill_python(p) def exit_code(self, *args): cmd_line = [sys.executable] @@ -72,6 +79,17 @@ class CmdLineTest(unittest.TestCase): self.exit_code('-m', 'timeit', '-n', '1'), 0) + def test_run_module_bug1764407(self): + # -m and -i need to play well together + # Runs the timeit module and checks the __main__ + # namespace has been populated appropriately + p = _spawn_python('-i', '-m', 'timeit', '-n', '1') + p.stdin.write('Timer\n') + p.stdin.write('exit()\n') + data = _kill_python(p) + self.assertTrue(data.find(b'1 loop') != -1) + self.assertTrue(data.find(b'__main__.Timer') != -1) + def test_run_code(self): # Test expected operation of the '-c' switch # Switch needs an argument diff --git a/Lib/test/test_frozen.py b/Lib/test/test_frozen.py index 5387e56..e47bb64 100644 --- a/Lib/test/test_frozen.py +++ b/Lib/test/test_frozen.py @@ -1,27 +1,40 @@ # Test the frozen module defined in frozen.c. +from __future__ import with_statement -from test.test_support import TestFailed +from test.test_support import captured_stdout, run_unittest +import unittest import sys, os -try: - import __hello__ -except ImportError as x: - raise TestFailed, "import __hello__ failed:" + str(x) - -try: - import __phello__ -except ImportError as x: - raise TestFailed, "import __phello__ failed:" + str(x) - -try: - import __phello__.spam -except ImportError as x: - raise TestFailed, "import __phello__.spam failed:" + str(x) - -if sys.platform != "mac": # On the Mac this import does succeed. - try: - import __phello__.foo - except ImportError: - pass - else: - raise TestFailed, "import __phello__.foo should have failed" +class FrozenTests(unittest.TestCase): + def test_frozen(self): + + with captured_stdout() as stdout: + try: + import __hello__ + except ImportError as x: + self.fail("import __hello__ failed:" + str(x)) + + try: + import __phello__ + except ImportError as x: + self.fail("import __phello__ failed:" + str(x)) + + try: + import __phello__.spam + except ImportError as x: + self.fail("import __phello__.spam failed:" + str(x)) + + if sys.platform != "mac": # On the Mac this import does succeed. + try: + import __phello__.foo + except ImportError: + pass + else: + self.fail("import __phello__.foo should have failed") + + self.assertEquals(stdout.getvalue(), + 'Hello world...\nHello world...\nHello world...\n') + + +def test_main(): + run_unittest(FrozenTests) diff --git a/Lib/test/test_mmap.py b/Lib/test/test_mmap.py index b0f9e9e..f2232e4 100644 --- a/Lib/test/test_mmap.py +++ b/Lib/test/test_mmap.py @@ -305,6 +305,40 @@ class MmapTests(unittest.TestCase): m[x] = b self.assertEqual(m[x], b) + def test_extended_getslice(self): + # Test extended slicing by comparing with list slicing. + s = bytes(reversed(range(256))) + m = mmap.mmap(-1, len(s)) + m[:] = s + self.assertEqual(m[:], s) + indices = (0, None, 1, 3, 19, 300, -1, -2, -31, -300) + for start in indices: + for stop in indices: + # Skip step 0 (invalid) + for step in indices[1:]: + self.assertEqual(m[start:stop:step], + s[start:stop:step]) + + def test_extended_set_del_slice(self): + # Test extended slicing by comparing with list slicing. + s = bytes(reversed(range(256))) + m = mmap.mmap(-1, len(s)) + indices = (0, None, 1, 3, 19, 300, -1, -2, -31, -300) + for start in indices: + for stop in indices: + # Skip invalid step 0 + for step in indices[1:]: + m[:] = s + self.assertEqual(m[:], s) + L = list(s) + # Make sure we have a slice of exactly the right length, + # but with different data. + data = L[start:stop:step] + data = bytes(reversed(data)) + L[start:stop:step] = data + m[start:stop:step] = data + self.assertEquals(m[:], bytes(L)) + def test_main(): run_unittest(MmapTests) diff --git a/Lib/test/test_ossaudiodev.py b/Lib/test/test_ossaudiodev.py index 8da22eb..97d5989 100644 --- a/Lib/test/test_ossaudiodev.py +++ b/Lib/test/test_ossaudiodev.py @@ -77,8 +77,7 @@ class OSSAudioDevTests(unittest.TestCase): # set parameters based on .au file headers dsp.setparameters(AFMT_S16_NE, nchannels, rate) - print ("playing test sound file (expected running time: %.2f sec)" - % expected_time) + self.assertEquals("%.2f" % expected_time, "2.93") t1 = time.time() dsp.write(data) dsp.close() @@ -121,7 +120,6 @@ class OSSAudioDevTests(unittest.TestCase): "setparameters%r: returned %r" % (config, result)) def set_bad_parameters(self, dsp): - # Now try some configurations that are presumably bogus: eg. 300 # channels currently exceeds even Hollywood's ambitions, and # negative sampling rate is utter nonsense. setparameters() should @@ -166,7 +164,7 @@ class OSSAudioDevTests(unittest.TestCase): def test_main(): try: dsp = ossaudiodev.open('w') - except IOError as msg: + except (ossaudiodev.error, IOError) as msg: if msg.args[0] in (errno.EACCES, errno.ENOENT, errno.ENODEV, errno.EBUSY): raise TestSkipped(msg) diff --git a/Lib/test/test_pkg.py b/Lib/test/test_pkg.py index 45d0f4f..1e32d2c 100644 --- a/Lib/test/test_pkg.py +++ b/Lib/test/test_pkg.py @@ -155,7 +155,8 @@ class Test(unittest.TestCase): ("t4 sub.py", "raise RuntimeError('Shouldnt load sub.py')"), ("t4 sub", None), ("t4 sub __init__.py", ""), - ("t4 sub subsub.py", "raise RuntimeError('Shouldnt load subsub.py')"), + ("t4 sub subsub.py", + "raise RuntimeError('Shouldnt load subsub.py')"), ("t4 sub subsub", None), ("t4 sub subsub __init__.py", "spam = 1"), ] @@ -196,7 +197,8 @@ class Test(unittest.TestCase): def test_6(self): hier = [ ("t6", None), - ("t6 __init__.py", "__all__ = ['spam', 'ham', 'eggs']"), + ("t6 __init__.py", + "__all__ = ['spam', 'ham', 'eggs']"), ("t6 spam.py", ""), ("t6 ham.py", ""), ("t6 eggs.py", ""), @@ -223,10 +225,11 @@ class Test(unittest.TestCase): ("t7.py", ""), ("t7", None), ("t7 __init__.py", ""), - ("t7 sub.py", "raise RuntimeError('Shouldnt load sub.py')"), + ("t7 sub.py", + "raise RuntimeError('Shouldnt load sub.py')"), ("t7 sub", None), ("t7 sub __init__.py", ""), - ("t7 sub subsub.py", + ("t7 sub .py", "raise RuntimeError('Shouldnt load subsub.py')"), ("t7 sub subsub", None), ("t7 sub subsub __init__.py", diff --git a/Lib/test/test_runpy.py b/Lib/test/test_runpy.py index b628d77..b003f17 100644 --- a/Lib/test/test_runpy.py +++ b/Lib/test/test_runpy.py @@ -5,7 +5,7 @@ import os.path import sys import tempfile from test.test_support import verbose, run_unittest, forget -from runpy import _run_module_code, run_module +from runpy import _run_code, _run_module_code, _run_module_as_main, run_module # Set up the test code and expected results @@ -29,6 +29,16 @@ class RunModuleCodeTest(unittest.TestCase): "nested = runpy._run_module_code('x=1\\n', mod_name='<run>')\n" ) + def test_run_code(self): + saved_argv0 = sys.argv[0] + d = _run_code(self.test_source, {}) + self.failUnless(d["result"] == self.expected_result) + self.failUnless(d["__name__"] is None) + self.failUnless(d["__file__"] is None) + self.failUnless(d["__loader__"] is None) + self.failUnless(d["run_argv0"] is saved_argv0) + self.failUnless("run_name" not in d) + self.failUnless(sys.argv[0] is saved_argv0) def test_run_module_code(self): initial = object() @@ -37,44 +47,24 @@ class RunModuleCodeTest(unittest.TestCase): loader = "Now you're just being silly" d1 = dict(initial=initial) saved_argv0 = sys.argv[0] - try: - d2 = _run_module_code(self.test_source, - d1, - name, - file, - loader, - alter_sys=True) - self.failUnless("result" not in d1) - self.failUnless(d2["initial"] is initial) - self.assertEqual(d2["result"], self.expected_result) - self.assertEqual(d2["nested"]["x"], 1) - self.assertEqual(d2["nested"]["__name__"], "<run>") - self.failUnless(d2["__name__"] is name) - self.failUnless(d2["__file__"] is file) - self.failUnless(d2["__loader__"] is loader) - self.failUnless(d2["run_argv0"] is file) - self.failUnless(d2["run_name_in_sys_modules"]) - self.failUnless(d2["module_in_sys_modules"]) - self.failUnless(sys.argv[0] is not saved_argv0) - self.failUnless(name in sys.modules) - finally: - sys.argv[0] = saved_argv0 - if name in sys.modules: - del sys.modules[name] - - def test_run_module_code_defaults(self): - saved_argv0 = sys.argv[0] - d = _run_module_code(self.test_source) - self.assertEqual(d["result"], self.expected_result) - self.failUnless(d["nested"]["x"] == 1) - self.failUnless(d["nested"]["__name__"] == "<run>") - self.failUnless(d["__name__"] is None) - self.failUnless(d["__file__"] is None) - self.failUnless(d["__loader__"] is None) - self.failUnless(d["run_argv0"] is saved_argv0) - self.failUnless(not d["run_name_in_sys_modules"]) + d2 = _run_module_code(self.test_source, + d1, + name, + file, + loader) + self.failUnless("result" not in d1) + self.failUnless(d2["initial"] is initial) + self.assertEqual(d2["result"], self.expected_result) + self.assertEqual(d2["nested"]["x"], 1) + self.failUnless(d2["__name__"] is name) + self.failUnless(d2["run_name_in_sys_modules"]) + self.failUnless(d2["module_in_sys_modules"]) + self.failUnless(d2["__file__"] is file) + self.failUnless(d2["run_argv0"] is file) + self.failUnless(d2["__loader__"] is loader) self.failUnless(sys.argv[0] is saved_argv0) - self.failUnless(None not in sys.modules) + self.failUnless(name not in sys.modules) + class RunModuleTest(unittest.TestCase): diff --git a/Lib/test/test_signal.py b/Lib/test/test_signal.py index 15c6e9c..c1df51e 100644 --- a/Lib/test/test_signal.py +++ b/Lib/test/test_signal.py @@ -1,167 +1,178 @@ -# Test the signal module -from test.test_support import verbose, TestSkipped, TestFailed, vereq +import unittest +from test import test_support import signal import os, sys, time -if sys.platform[:3] in ('win', 'os2'): - raise TestSkipped, "Can't test signal on %s" % sys.platform - -MAX_DURATION = 20 # Entire test should last at most 20 sec. - -if verbose: - x = '-x' -else: - x = '+x' - -pid = os.getpid() -if verbose: - print("test runner's pid is", pid) - -# Shell script that will send us asynchronous signals -script = """ - ( - set %(x)s - sleep 2 - kill -HUP %(pid)d - sleep 2 - kill -USR1 %(pid)d - sleep 2 - kill -USR2 %(pid)d - ) & -""" % vars() - -a_called = b_called = False - -def handlerA(*args): - global a_called - a_called = True - if verbose: - print("handlerA invoked", args) - class HandlerBCalled(Exception): pass -def handlerB(*args): - global b_called - b_called = True - if verbose: - print("handlerB invoked", args) - raise HandlerBCalled, args - -# Set up a child to send signals to us (the parent) after waiting long -# enough to receive the alarm. It seems we miss the alarm for some -# reason. This will hopefully stop the hangs on Tru64/Alpha. -# Alas, it doesn't. Tru64 appears to miss all the signals at times, or -# seemingly random subsets of them, and nothing done in force_test_exit -# so far has actually helped. -def force_test_exit(): - # Sigh, both imports seem necessary to avoid errors. - import os - fork_pid = os.fork() - if fork_pid: - # In parent. - return fork_pid - - # In child. - import os, time - try: - # Wait 5 seconds longer than the expected alarm to give enough - # time for the normal sequence of events to occur. This is - # just a stop-gap to try to prevent the test from hanging. - time.sleep(MAX_DURATION + 5) - print(' child should not have to kill parent', file=sys.__stdout__) - for signame in "SIGHUP", "SIGUSR1", "SIGUSR2", "SIGALRM": - os.kill(pid, getattr(signal, signame)) - print(" child sent", signame, "to", pid, file=sys.__stdout__) - time.sleep(1) - finally: - os._exit(0) - -# Install handlers. -hup = signal.signal(signal.SIGHUP, handlerA) -usr1 = signal.signal(signal.SIGUSR1, handlerB) -usr2 = signal.signal(signal.SIGUSR2, signal.SIG_IGN) -alrm = signal.signal(signal.SIGALRM, signal.default_int_handler) - -try: - - signal.alarm(MAX_DURATION) - vereq(signal.getsignal(signal.SIGHUP), handlerA) - vereq(signal.getsignal(signal.SIGUSR1), handlerB) - vereq(signal.getsignal(signal.SIGUSR2), signal.SIG_IGN) - vereq(signal.getsignal(signal.SIGALRM), signal.default_int_handler) - - # Try to ensure this test exits even if there is some problem with alarm. - # Tru64/Alpha often hangs and is ultimately killed by the buildbot. - fork_pid = force_test_exit() - - try: - signal.getsignal(4242) - raise TestFailed('expected ValueError for invalid signal # to ' - 'getsignal()') - except ValueError: - pass - - try: - signal.signal(4242, handlerB) - raise TestFailed('expected ValueError for invalid signal # to ' - 'signal()') - except ValueError: - pass - - try: - signal.signal(signal.SIGUSR1, None) - raise TestFailed('expected TypeError for non-callable') - except TypeError: - pass - - # Launch an external script to send us signals. - # We expect the external script to: - # send HUP, which invokes handlerA to set a_called - # send USR1, which invokes handlerB to set b_called and raise - # HandlerBCalled - # send USR2, which is ignored - # - # Then we expect the alarm to go off, and its handler raises - # KeyboardInterrupt, finally getting us out of the loop. - os.system(script) - try: - print("starting pause() loop...") - while 1: - try: - if verbose: - print("call pause()...") - signal.pause() - if verbose: - print("pause() returned") - except HandlerBCalled: - if verbose: - print("HandlerBCalled exception caught") - - except KeyboardInterrupt: - if verbose: - print("KeyboardInterrupt (the alarm() went off)") - - if not a_called: - print('HandlerA not called') - - if not b_called: - print('HandlerB not called') - -finally: - # Forcibly kill the child we created to ping us if there was a test error. - try: - # Make sure we don't kill ourself if there was a fork error. - if fork_pid > 0: - os.kill(fork_pid, signal.SIGKILL) - except: - # If the child killed us, it has probably exited. Killing a - # non-existent process will raise an error which we don't care about. - pass - - # Restore handlers. - signal.alarm(0) # cancel alarm in case we died early - signal.signal(signal.SIGHUP, hup) - signal.signal(signal.SIGUSR1, usr1) - signal.signal(signal.SIGUSR2, usr2) - signal.signal(signal.SIGALRM, alrm) +class InterProcessSignalTests(unittest.TestCase): + MAX_DURATION = 20 # Entire test should last at most 20 sec. + + # Set up a child to send signals to us (the parent) after waiting + # long enough to receive the alarm. It seems we miss the alarm + # for some reason. This will hopefully stop the hangs on + # Tru64/Alpha. Alas, it doesn't. Tru64 appears to miss all the + # signals at times, or seemingly random subsets of them, and + # nothing done in force_test_exit so far has actually helped. + def spawn_force_test_exit_process(self, parent_pid): + # Sigh, both imports seem necessary to avoid errors. + import os + fork_pid = os.fork() + if fork_pid: + # In parent. + return fork_pid + + # In child. + import os, time + try: + # Wait 5 seconds longer than the expected alarm to give enough + # time for the normal sequence of events to occur. This is + # just a stop-gap to try to prevent the test from hanging. + time.sleep(self.MAX_DURATION + 5) + print(" child should not have to kill parent", + file=sys.__stdout__) + for signame in "SIGHUP", "SIGUSR1", "SIGUSR2", "SIGALRM": + os.kill(parent_pid, getattr(signal, signame)) + print(" child sent", signame, "to", + parent_pid, file=sys.__stdout__) + time.sleep(1) + finally: + os._exit(0) + + def handlerA(self, *args): + self.a_called = True + if test_support.verbose: + print("handlerA invoked", args) + + def handlerB(self, *args): + self.b_called = True + if test_support.verbose: + print("handlerB invoked", args) + raise HandlerBCalled(*args) + + def test_main(self): + self.assertEquals(signal.getsignal(signal.SIGHUP), self.handlerA) + self.assertEquals(signal.getsignal(signal.SIGUSR1), self.handlerB) + self.assertEquals(signal.getsignal(signal.SIGUSR2), signal.SIG_IGN) + self.assertEquals(signal.getsignal(signal.SIGALRM), + signal.default_int_handler) + + # Launch an external script to send us signals. + # We expect the external script to: + # send HUP, which invokes handlerA to set a_called + # send USR1, which invokes handlerB to set b_called and raise + # HandlerBCalled + # send USR2, which is ignored + # + # Then we expect the alarm to go off, and its handler raises + # KeyboardInterrupt, finally getting us out of the loop. + + if test_support.verbose: + verboseflag = '-x' + else: + verboseflag = '+x' + + pid = self.pid + if test_support.verbose: + print("test runner's pid is", pid) + + # Shell script that will send us asynchronous signals + script = """ + ( + set %(verboseflag)s + sleep 2 + kill -HUP %(pid)d + sleep 2 + kill -USR1 %(pid)d + sleep 2 + kill -USR2 %(pid)d + ) & + """ % vars() + + signal.alarm(self.MAX_DURATION) + + handler_b_exception_raised = False + + os.system(script) + try: + if test_support.verbose: + print("starting pause() loop...") + while 1: + try: + if test_support.verbose: + print("call pause()...") + signal.pause() + if test_support.verbose: + print("pause() returned") + except HandlerBCalled: + handler_b_exception_raised = True + if test_support.verbose: + print("HandlerBCalled exception caught") + + except KeyboardInterrupt: + if test_support.verbose: + print("KeyboardInterrupt (the alarm() went off)") + + self.assert_(self.a_called) + self.assert_(self.b_called) + self.assert_(handler_b_exception_raised) + + def setUp(self): + # Install handlers. + self.hup = signal.signal(signal.SIGHUP, self.handlerA) + self.usr1 = signal.signal(signal.SIGUSR1, self.handlerB) + self.usr2 = signal.signal(signal.SIGUSR2, signal.SIG_IGN) + self.alrm = signal.signal(signal.SIGALRM, + signal.default_int_handler) + self.a_called = False + self.b_called = False + self.pid = os.getpid() + self.fork_pid = self.spawn_force_test_exit_process(self.pid) + + def tearDown(self): + # Forcibly kill the child we created to ping us if there was a + # test error. + try: + # Make sure we don't kill ourself if there was a fork + # error. + if self.fork_pid > 0: + os.kill(self.fork_pid, signal.SIGKILL) + except: + # If the child killed us, it has probably exited. Killing + # a non-existent process will raise an error which we + # don't care about. + pass + + # Restore handlers. + signal.alarm(0) # cancel alarm in case we died early + signal.signal(signal.SIGHUP, self.hup) + signal.signal(signal.SIGUSR1, self.usr1) + signal.signal(signal.SIGUSR2, self.usr2) + signal.signal(signal.SIGALRM, self.alrm) + + +class BasicSignalTests(unittest.TestCase): + def test_out_of_range_signal_number_raises_error(self): + self.assertRaises(ValueError, signal.getsignal, 4242) + + def trivial_signal_handler(*args): + pass + + self.assertRaises(ValueError, signal.signal, 4242, + trivial_signal_handler) + + def test_setting_signal_handler_to_none_raises_error(self): + self.assertRaises(TypeError, signal.signal, + signal.SIGUSR1, None) + +def test_main(): + if sys.platform[:3] in ('win', 'os2'): + raise test_support.TestSkipped("Can't test signal on %s" % \ + sys.platform) + + test_support.run_unittest(BasicSignalTests, InterProcessSignalTests) + + +if __name__ == "__main__": + test_main() diff --git a/Lib/test/test_ssl.py b/Lib/test/test_ssl.py new file mode 100644 index 0000000..35c6af9 --- /dev/null +++ b/Lib/test/test_ssl.py @@ -0,0 +1,372 @@ +# Test the support for SSL and sockets + +import sys +import unittest +from test import test_support +import socket +import errno +import threading +import subprocess +import time +import os +import pprint +import urllib +import shutil +import traceback + +# Optionally test SSL support, if we have it in the tested platform +skip_expected = False +try: + import ssl +except ImportError: + skip_expected = True + +CERTFILE = None + + +def handle_error(prefix): + exc_format = ' '.join(traceback.format_exception(*sys.exc_info())) + sys.stdout.write(prefix + exc_format) + + +class BasicTests(unittest.TestCase): + + def testRudeShutdown(self): + # Some random port to connect to. + PORT = [9934] + + listener_ready = threading.Event() + listener_gone = threading.Event() + + # `listener` runs in a thread. It opens a socket listening on + # PORT, and sits in an accept() until the main thread connects. + # Then it rudely closes the socket, and sets Event `listener_gone` + # to let the main thread know the socket is gone. + def listener(): + s = socket.socket() + PORT[0] = test_support.bind_port(s, '', PORT[0]) + s.listen(5) + listener_ready.set() + s.accept() + s = None # reclaim the socket object, which also closes it + listener_gone.set() + + def connector(): + listener_ready.wait() + s = socket.socket() + s.connect(('localhost', PORT[0])) + listener_gone.wait() + try: + ssl_sock = socket.ssl(s) + except socket.sslerror: + pass + else: + raise test_support.TestFailed( + 'connecting to closed SSL socket should have failed') + + t = threading.Thread(target=listener) + t.start() + connector() + t.join() + + def testSSLconnect(self): + import os + with test_support.transient_internet(): + s = ssl.sslsocket(socket.socket(socket.AF_INET), + cert_reqs=ssl.CERT_NONE) + s.connect(("pop.gmail.com", 995)) + c = s.getpeercert() + if c: + raise test_support.TestFailed("Peer cert %s shouldn't be here!") + s.close() + + # this should fail because we have no verification certs + s = ssl.sslsocket(socket.socket(socket.AF_INET), + cert_reqs=ssl.CERT_REQUIRED) + try: + s.connect(("pop.gmail.com", 995)) + except ssl.sslerror: + pass + finally: + s.close() + +class ConnectedTests(unittest.TestCase): + + def testTLSecho (self): + + s1 = socket.socket() + try: + s1.connect(('127.0.0.1', 10024)) + except: + handle_error("connection failure:\n") + raise test_support.TestFailed("Can't connect to test server") + else: + try: + c1 = ssl.sslsocket(s1, ssl_version=ssl.PROTOCOL_TLSv1) + except: + handle_error("SSL handshake failure:\n") + raise test_support.TestFailed("Can't SSL-handshake with test server") + else: + if not c1: + raise test_support.TestFailed("Can't SSL-handshake with test server") + indata = "FOO\n" + c1.write(indata) + outdata = c1.read() + if outdata != indata.lower(): + raise test_support.TestFailed("bad data <<%s>> received; expected <<%s>>\n" % (data, indata.lower())) + c1.close() + + def testReadCert(self): + + s2 = socket.socket() + try: + s2.connect(('127.0.0.1', 10024)) + except: + handle_error("connection failure:\n") + raise test_support.TestFailed("Can't connect to test server") + else: + try: + c2 = ssl.sslsocket(s2, ssl_version=ssl.PROTOCOL_TLSv1, + cert_reqs=ssl.CERT_REQUIRED, ca_certs=CERTFILE) + except: + handle_error("SSL handshake failure:\n") + raise test_support.TestFailed("Can't SSL-handshake with test server") + else: + if not c2: + raise test_support.TestFailed("Can't SSL-handshake with test server") + cert = c2.getpeercert() + if not cert: + raise test_support.TestFailed("Can't get peer certificate.") + if not cert.has_key('subject'): + raise test_support.TestFailed( + "No subject field in certificate: %s." % + pprint.pformat(cert)) + if not (cert['subject'].has_key('organizationName')): + raise test_support.TestFailed( + "No 'organizationName' field in certificate subject: %s." % + pprint.pformat(cert)) + if (cert['subject']['organizationName'] != + "Python Software Foundation"): + raise test_support.TestFailed( + "Invalid 'organizationName' field in certificate subject; " + "should be 'Python Software Foundation'."); + c2.close() + + +class ThreadedEchoServer(threading.Thread): + + class ConnectionHandler(threading.Thread): + + def __init__(self, server, connsock): + self.server = server + self.running = False + self.sock = connsock + threading.Thread.__init__(self) + self.setDaemon(True) + + def run (self): + self.running = True + try: + sslconn = ssl.sslsocket(self.sock, server_side=True, + certfile=self.server.certificate, + ssl_version=self.server.protocol, + cert_reqs=self.server.certreqs) + except: + # here, we want to stop the server, because this shouldn't + # happen in the context of our test case + handle_error("Test server failure:\n") + self.running = False + # normally, we'd just stop here, but for the test + # harness, we want to stop the server + self.server.stop() + return + + while self.running: + try: + msg = sslconn.read() + if not msg: + # eof, so quit this handler + self.running = False + sslconn.close() + elif msg.strip() == 'over': + sslconn.close() + self.server.stop() + self.running = False + else: + if test_support.verbose: + sys.stdout.write("\nserver: %s\n" % msg.strip().lower()) + sslconn.write(msg.lower()) + except ssl.sslerror: + handle_error("Test server failure:\n") + sslconn.close() + self.running = False + # normally, we'd just stop here, but for the test + # harness, we want to stop the server + self.server.stop() + except: + handle_error('') + + def __init__(self, port, certificate, ssl_version=None, + certreqs=None, cacerts=None): + if ssl_version is None: + ssl_version = ssl.PROTOCOL_TLSv1 + if certreqs is None: + certreqs = ssl.CERT_NONE + self.certificate = certificate + self.protocol = ssl_version + self.certreqs = certreqs + self.cacerts = cacerts + self.sock = socket.socket() + self.flag = None + if hasattr(socket, 'SO_REUSEADDR'): + self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + if hasattr(socket, 'SO_REUSEPORT'): + self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) + self.sock.bind(('127.0.0.1', port)) + self.active = False + threading.Thread.__init__(self) + self.setDaemon(False) + + def start (self, flag=None): + self.flag = flag + threading.Thread.start(self) + + def run (self): + self.sock.settimeout(0.5) + self.sock.listen(5) + self.active = True + if self.flag: + # signal an event + self.flag.set() + while self.active: + try: + newconn, connaddr = self.sock.accept() + if test_support.verbose: + sys.stdout.write('\nserver: new connection from ' + str(connaddr) + '\n') + handler = self.ConnectionHandler(self, newconn) + handler.start() + except socket.timeout: + pass + except KeyboardInterrupt: + self.stop() + except: + handle_error("Test server failure:\n") + + def stop (self): + self.active = False + self.sock.close() + +CERTFILE_CONFIG_TEMPLATE = """ +# create RSA certs - Server + +[ req ] +default_bits = 1024 +encrypt_key = yes +distinguished_name = req_dn +x509_extensions = cert_type + +[ req_dn ] +countryName = Country Name (2 letter code) +countryName_default = US +countryName_min = 2 +countryName_max = 2 + +stateOrProvinceName = State or Province Name (full name) +stateOrProvinceName_default = %(state)s + +localityName = Locality Name (eg, city) +localityName_default = %(city)s + +0.organizationName = Organization Name (eg, company) +0.organizationName_default = %(organization)s + +organizationalUnitName = Organizational Unit Name (eg, section) +organizationalUnitName_default = %(unit)s + +0.commonName = Common Name (FQDN of your server) +0.commonName_default = %(common-name)s + +# To create a certificate for more than one name uncomment: +# 1.commonName = DNS alias of your server +# 2.commonName = DNS alias of your server +# ... +# See http://home.netscape.com/eng/security/ssl_2.0_certificate.html +# to see how Netscape understands commonName. + +[ cert_type ] +nsCertType = server +""" + +def create_cert_files(hostname=None): + + """This is the routine that was run to create the certificate + and private key contained in keycert.pem.""" + + import tempfile, socket, os + d = tempfile.mkdtemp() + # now create a configuration file for the CA signing cert + fqdn = hostname or socket.getfqdn() + crtfile = os.path.join(d, "cert.pem") + conffile = os.path.join(d, "ca.conf") + fp = open(conffile, "w") + fp.write(CERTFILE_CONFIG_TEMPLATE % + {'state': "Delaware", + 'city': "Wilmington", + 'organization': "Python Software Foundation", + 'unit': "SSL", + 'common-name': fqdn, + }) + fp.close() + error = os.system( + "openssl req -batch -new -x509 -days 2000 -nodes -config %s " + "-keyout \"%s\" -out \"%s\" > /dev/null < /dev/null 2>&1" % + (conffile, crtfile, crtfile)) + # now we have a self-signed server cert in crtfile + os.unlink(conffile) + if (os.WEXITSTATUS(error) or + not os.path.exists(crtfile) or os.path.getsize(crtfile) == 0): + if test_support.verbose: + sys.stdout.write("Unable to create certificate for test, " + + "error status %d\n" % (error >> 8)) + crtfile = None + elif test_support.verbose: + sys.stdout.write(open(crtfile, 'r').read() + '\n') + return d, crtfile + + +def test_main(verbose=False): + if skip_expected: + raise test_support.TestSkipped("socket module has no ssl support") + + global CERTFILE + CERTFILE = os.path.join(os.path.dirname(__file__) or os.curdir, + "keycert.pem") + if not CERTFILE: + sys.__stdout__.write("Skipping test_ssl ConnectedTests; " + "couldn't create a certificate.\n") + + tests = [BasicTests] + + server = None + if CERTFILE and test_support.is_resource_enabled('network'): + server = ThreadedEchoServer(10024, CERTFILE) + flag = threading.Event() + server.start(flag) + # wait for it to start + flag.wait() + tests.append(ConnectedTests) + + thread_info = test_support.threading_setup() + + try: + test_support.run_unittest(*tests) + finally: + if server is not None and server.active: + server.stop() + # wait for it to stop + server.join() + + test_support.threading_cleanup(*thread_info) + +if __name__ == "__main__": + test_main() diff --git a/Lib/test/test_structmembers.py b/Lib/test/test_structmembers.py index c3d01c8..93d7bfb 100644 --- a/Lib/test/test_structmembers.py +++ b/Lib/test/test_structmembers.py @@ -5,7 +5,7 @@ from _testcapi import test_structmembersType, \ LONG_MAX, LONG_MIN, ULONG_MAX, \ LLONG_MAX, LLONG_MIN, ULLONG_MAX -import warnings, unittest +import warnings, unittest, sys from test import test_support ts=test_structmembersType(1,2,3,4,5,6,7,8,9.99999,10.1010101010) @@ -59,7 +59,7 @@ class ReadWriteTests(unittest.TestCase): class TestWarnings(unittest.TestCase): def has_warned(self, w): - self.assert_(w.category is RuntimeWarning) + self.assertEqual(w.category, RuntimeWarning) def test_byte_max(self): with test_support.catch_warning() as w: @@ -94,10 +94,13 @@ class TestWarnings(unittest.TestCase): def test_main(verbose=None): - test_support.run_unittest( - ReadWriteTests, - TestWarnings - ) + # Obscure hack so that this test passes after reloads or repeated calls + # to test_main (regrtest -R). + if '__warningregistry__' in globals(): + del globals()['__warningregistry__'] + if hasattr(sys, '__warningregistry__'): + del sys.__warningregistry__ + test_support.run_unittest(__name__) if __name__ == "__main__": test_main(verbose=True) diff --git a/Lib/test/test_structseq.py b/Lib/test/test_structseq.py index 12ebef9..7a18fb2 100644 --- a/Lib/test/test_structseq.py +++ b/Lib/test/test_structseq.py @@ -97,6 +97,18 @@ class StructSeqTest(unittest.TestCase): t = time.gmtime() x = t.__reduce__() + def test_extended_getslice(self): + # Test extended slicing by comparing with list slicing. + t = time.gmtime() + L = list(t) + indices = (0, None, 1, 3, 19, 300, -1, -2, -31, -300) + for start in indices: + for stop in indices: + # Skip step 0 (invalid) + for step in indices[1:]: + self.assertEqual(list(t[start:stop:step]), + L[start:stop:step]) + def test_main(): test_support.run_unittest(StructSeqTest) diff --git a/Lib/test/test_support.py b/Lib/test/test_support.py index 2d9612e..99f57e6 100644 --- a/Lib/test/test_support.py +++ b/Lib/test/test_support.py @@ -362,6 +362,22 @@ def transient_internet(): return contextlib.nested(time_out, socket_peer_reset, ioerror_peer_reset) +@contextlib.contextmanager +def captured_stdout(): + """Run the with statement body using a StringIO object as sys.stdout. + Example use:: + + with captured_stdout() as s: + print "hello" + assert s.getvalue() == "hello" + """ + import io + orig_stdout = sys.stdout + sys.stdout = io.StringIO() + yield sys.stdout + sys.stdout = orig_stdout + + #======================================================================= # Decorator for running a function in a different locale, correctly resetting # it afterwards. diff --git a/Lib/test/test_tarfile.py b/Lib/test/test_tarfile.py index 8a59879..10498bf 100644 --- a/Lib/test/test_tarfile.py +++ b/Lib/test/test_tarfile.py @@ -140,11 +140,25 @@ class UstarReadTest(ReadTest): class MiscReadTest(ReadTest): - def test_no_filename(self): + def test_no_name_argument(self): fobj = open(self.tarname, "rb") tar = tarfile.open(fileobj=fobj, mode=self.mode) self.assertEqual(tar.name, os.path.abspath(fobj.name)) + def test_no_name_attribute(self): + data = open(self.tarname, "rb").read() + fobj = io.BytesIO(data) + self.assertRaises(AttributeError, getattr, fobj, "name") + tar = tarfile.open(fileobj=fobj, mode=self.mode) + self.assertEqual(tar.name, None) + + def test_empty_name_attribute(self): + data = open(self.tarname, "rb").read() + fobj = io.BytesIO(data) + fobj.name = "" + tar = tarfile.open(fileobj=fobj, mode=self.mode) + self.assertEqual(tar.name, None) + def test_fail_comp(self): # For Gzip and Bz2 Tests: fail with a ReadError on an uncompressed file. if self.mode == "r:": diff --git a/Lib/test/test_userstring.py b/Lib/test/test_userstring.py index ec0f1a9..8bd8d10 100755 --- a/Lib/test/test_userstring.py +++ b/Lib/test/test_userstring.py @@ -3,6 +3,7 @@ # UserString instances should behave similar to builtin string objects. import unittest +import string from test import test_support, string_tests from UserString import UserString, MutableString @@ -86,6 +87,28 @@ class MutableStringTest(UserStringTest): del s[-1:10] self.assertEqual(s, "fo") + def test_extended_set_del_slice(self): + indices = (0, None, 1, 3, 19, 100, -1, -2, -31, -100) + orig = string.ascii_letters + string.digits + for start in indices: + for stop in indices: + # Use indices[1:] when MutableString can handle real + # extended slices + for step in (None, 1, -1): + s = self.type2test(orig) + L = list(orig) + # Make sure we have a slice of exactly the right length, + # but with (hopefully) different data. + data = L[start:stop:step] + data.reverse() + L[start:stop:step] = data + s[start:stop:step] = "".join(data) + self.assertEquals(s, "".join(L)) + + del L[start:stop:step] + del s[start:stop:step] + self.assertEquals(s, "".join(L)) + def test_immutable(self): s = self.type2test("foobar") s2 = s.immutable() diff --git a/Lib/test/test_winreg.py b/Lib/test/test_winreg.py index 9984af1..fc898b8 100644 --- a/Lib/test/test_winreg.py +++ b/Lib/test/test_winreg.py @@ -3,8 +3,9 @@ from _winreg import * import os, sys +import unittest -from test.test_support import verify +from test import test_support test_key_name = "SOFTWARE\\Python Registry Test Key - Delete Me" @@ -13,137 +14,152 @@ test_data = [ ("String Val", "A string value", REG_SZ), ("StringExpand", "The path is %path%", REG_EXPAND_SZ), ("Multi-string", ["Lots", "of", "string", "values"], REG_MULTI_SZ), - ("Raw Data", bytes("binary"+chr(0)+"data"), REG_BINARY), + ("Raw Data", b"binary\x00data", REG_BINARY), ("Big String", "x"*(2**14-1), REG_SZ), ("Big Binary", b"x"*(2**14), REG_BINARY), ] -def WriteTestData(root_key): - # Set the default value for this key. - SetValue(root_key, test_key_name, REG_SZ, "Default value") - key = CreateKey(root_key, test_key_name) - # Create a sub-key - sub_key = CreateKey(key, "sub_key") - # Give the sub-key some named values - - for value_name, value_data, value_type in test_data: - SetValueEx(sub_key, value_name, 0, value_type, value_data) - - # Check we wrote as many items as we thought. - nkeys, nvalues, since_mod = QueryInfoKey(key) - verify(nkeys==1, "Not the correct number of sub keys") - verify(nvalues==1, "Not the correct number of values") - nkeys, nvalues, since_mod = QueryInfoKey(sub_key) - verify(nkeys==0, "Not the correct number of sub keys") - verify(nvalues==len(test_data), "Not the correct number of values") - # Close this key this way... - # (but before we do, copy the key as an integer - this allows - # us to test that the key really gets closed). - int_sub_key = int(sub_key) - CloseKey(sub_key) - try: - QueryInfoKey(int_sub_key) - raise RuntimeError, "It appears the CloseKey() function does not close the actual key!" - except EnvironmentError: - pass - # ... and close that key that way :-) - int_key = int(key) - key.Close() - try: - QueryInfoKey(int_key) - raise RuntimeError, "It appears the key.Close() function does not close the actual key!" - except EnvironmentError: - pass - -def ReadTestData(root_key): - # Check we can get default value for this key. - val = QueryValue(root_key, test_key_name) - verify(type(val) is str and val=="Default value", "Registry didn't give back the correct value") - - key = OpenKey(root_key, test_key_name) - # Read the sub-keys - sub_key = OpenKey(key, "sub_key") - # Check I can enumerate over the values. - index = 0 - while 1: +class WinregTests(unittest.TestCase): + remote_name = None + + def WriteTestData(self, root_key): + # Set the default value for this key. + SetValue(root_key, test_key_name, REG_SZ, "Default value") + key = CreateKey(root_key, test_key_name) + # Create a sub-key + sub_key = CreateKey(key, "sub_key") + # Give the sub-key some named values + + for value_name, value_data, value_type in test_data: + SetValueEx(sub_key, value_name, 0, value_type, value_data) + + # Check we wrote as many items as we thought. + nkeys, nvalues, since_mod = QueryInfoKey(key) + self.assertEquals(nkeys, 1, "Not the correct number of sub keys") + self.assertEquals(nvalues, 1, "Not the correct number of values") + nkeys, nvalues, since_mod = QueryInfoKey(sub_key) + self.assertEquals(nkeys, 0, "Not the correct number of sub keys") + self.assertEquals(nvalues, len(test_data), + "Not the correct number of values") + # Close this key this way... + # (but before we do, copy the key as an integer - this allows + # us to test that the key really gets closed). + int_sub_key = int(sub_key) + CloseKey(sub_key) try: - data = EnumValue(sub_key, index) + QueryInfoKey(int_sub_key) + self.fail("It appears the CloseKey() function does " + "not close the actual key!") except EnvironmentError: - break - verify(data in test_data, "Didn't read back the correct test data") - index = index + 1 - verify(index==len(test_data), "Didn't read the correct number of items") - # Check I can directly access each item - for value_name, value_data, value_type in test_data: - read_val, read_typ = QueryValueEx(sub_key, value_name) - verify(read_val==value_data and read_typ == value_type, \ - "Could not directly read the value" ) - sub_key.Close() - # Enumerate our main key. - read_val = EnumKey(key, 0) - verify(read_val == "sub_key", "Read subkey value wrong") - try: - EnumKey(key, 1) - verify(0, "Was able to get a second key when I only have one!") - except EnvironmentError: - pass - - key.Close() - -def DeleteTestData(root_key): - key = OpenKey(root_key, test_key_name, 0, KEY_ALL_ACCESS) - sub_key = OpenKey(key, "sub_key", 0, KEY_ALL_ACCESS) - # It is not necessary to delete the values before deleting - # the key (although subkeys must not exist). We delete them - # manually just to prove we can :-) - for value_name, value_data, value_type in test_data: - DeleteValue(sub_key, value_name) - - nkeys, nvalues, since_mod = QueryInfoKey(sub_key) - verify(nkeys==0 and nvalues==0, "subkey not empty before delete") - sub_key.Close() - DeleteKey(key, "sub_key") + pass + # ... and close that key that way :-) + int_key = int(key) + key.Close() + try: + QueryInfoKey(int_key) + self.fail("It appears the key.Close() function " + "does not close the actual key!") + except EnvironmentError: + pass + + def ReadTestData(self, root_key): + # Check we can get default value for this key. + val = QueryValue(root_key, test_key_name) + self.assertEquals(val, "Default value", + "Registry didn't give back the correct value") - try: - # Shouldnt be able to delete it twice! - DeleteKey(key, "sub_key") - verify(0, "Deleting the key twice succeeded") - except EnvironmentError: - pass - key.Close() - DeleteKey(root_key, test_key_name) - # Opening should now fail! - try: key = OpenKey(root_key, test_key_name) - verify(0, "Could open the non-existent key") - except WindowsError: # Use this error name this time - pass - -def TestAll(root_key): - WriteTestData(root_key) - ReadTestData(root_key) - DeleteTestData(root_key) - -# Test on my local machine. -TestAll(HKEY_CURRENT_USER) -print("Local registry tests worked") -try: - remote_name = sys.argv[sys.argv.index("--remote")+1] -except (IndexError, ValueError): - remote_name = None + # Read the sub-keys + sub_key = OpenKey(key, "sub_key") + # Check I can enumerate over the values. + index = 0 + while 1: + try: + data = EnumValue(sub_key, index) + except EnvironmentError: + break + self.assertEquals(data in test_data, True, + "Didn't read back the correct test data") + index = index + 1 + self.assertEquals(index, len(test_data), + "Didn't read the correct number of items") + # Check I can directly access each item + for value_name, value_data, value_type in test_data: + read_val, read_typ = QueryValueEx(sub_key, value_name) + self.assertEquals(read_val, value_data, + "Could not directly read the value") + self.assertEquals(read_typ, value_type, + "Could not directly read the value") + sub_key.Close() + # Enumerate our main key. + read_val = EnumKey(key, 0) + self.assertEquals(read_val, "sub_key", "Read subkey value wrong") + try: + EnumKey(key, 1) + self.fail("Was able to get a second key when I only have one!") + except EnvironmentError: + pass + + key.Close() + + def DeleteTestData(self, root_key): + key = OpenKey(root_key, test_key_name, 0, KEY_ALL_ACCESS) + sub_key = OpenKey(key, "sub_key", 0, KEY_ALL_ACCESS) + # It is not necessary to delete the values before deleting + # the key (although subkeys must not exist). We delete them + # manually just to prove we can :-) + for value_name, value_data, value_type in test_data: + DeleteValue(sub_key, value_name) + + nkeys, nvalues, since_mod = QueryInfoKey(sub_key) + self.assertEquals(nkeys, 0, "subkey not empty before delete") + self.assertEquals(nvalues, 0, "subkey not empty before delete") + sub_key.Close() + DeleteKey(key, "sub_key") -if remote_name is not None: + try: + # Shouldnt be able to delete it twice! + DeleteKey(key, "sub_key") + self.fail("Deleting the key twice succeeded") + except EnvironmentError: + pass + key.Close() + DeleteKey(root_key, test_key_name) + # Opening should now fail! + try: + key = OpenKey(root_key, test_key_name) + self.fail("Could open the non-existent key") + except WindowsError: # Use this error name this time + pass + + def TestAll(self, root_key): + self.WriteTestData(root_key) + self.ReadTestData(root_key) + self.DeleteTestData(root_key) + + def testLocalMachineRegistryWorks(self): + self.TestAll(HKEY_CURRENT_USER) + + def testConnectRegistryToLocalMachineWorks(self): + # perform minimal ConnectRegistry test which just invokes it + h = ConnectRegistry(None, HKEY_LOCAL_MACHINE) + h.Close() + + def testRemoteMachineRegistryWorks(self): + if not self.remote_name: + raise test_support.TestSkipped("Remote machine name " + "not specified.") + remote_key = ConnectRegistry(self.remote_name, HKEY_CURRENT_USER) + self.TestAll(remote_key) + +def test_main(): + test_support.run_unittest(WinregTests) + +if __name__ == "__main__": try: - remote_key = ConnectRegistry(remote_name, HKEY_CURRENT_USER) - except EnvironmentError as exc: - print("Could not connect to the remote machine -", exc.strerror) - remote_key = None - if remote_key is not None: - TestAll(remote_key) - print("Remote registry tests worked") -else: - print("Remote registry calls can be tested using", end=' ') - print("'test_winreg.py --remote \\\\machine_name'") - # perform minimal ConnectRegistry test which just invokes it - h = ConnectRegistry(None, HKEY_LOCAL_MACHINE) - h.Close() + WinregTests.remote_name = sys.argv[sys.argv.index("--remote")+1] + except (IndexError, ValueError): + print("Remote registry calls can be tested using", + "'test_winreg.py --remote \\\\machine_name'") + WinregTests.remote_name = None + test_main() diff --git a/Lib/test/test_xmlrpc.py b/Lib/test/test_xmlrpc.py index ef61c55..07c7ad6 100644 --- a/Lib/test/test_xmlrpc.py +++ b/Lib/test/test_xmlrpc.py @@ -284,6 +284,27 @@ def http_server(evt, numrequests): evt.set() +def is_unavailable_exception(e): + '''Returns True if the given ProtocolError is the product of a server-side + exception caused by the 'temporarily unavailable' response sometimes + given by operations on non-blocking sockets.''' + # sometimes we get a -1 error code and/or empty headers + if e.errcode == -1 or e.headers is None: + return True + + exc_mess = e.headers.get('X-exception') + if exc_mess and 'temporarily unavailable' in exc_mess.lower(): + return True + + return False + +# NOTE: The tests in SimpleServerTestCase will ignore failures caused by +# "temporarily unavailable" exceptions raised in SimpleXMLRPCServer. This +# condition occurs infrequently on some platforms, frequently on others, and +# is apparently caused by using SimpleXMLRPCServer with a non-blocking socket. +# If the server class is updated at some point in the future to handle this +# situation more gracefully, these tests should be modified appropriately. + class SimpleServerTestCase(unittest.TestCase): def setUp(self): # enable traceback reporting @@ -291,7 +312,7 @@ class SimpleServerTestCase(unittest.TestCase): self.evt = threading.Event() # start server thread to handle requests - serv_args = (self.evt, 2) + serv_args = (self.evt, 1) threading.Thread(target=http_server, args=serv_args).start() # wait for port to be assigned to server @@ -314,8 +335,10 @@ class SimpleServerTestCase(unittest.TestCase): p = xmlrpclib.ServerProxy('http://localhost:%d' % PORT) self.assertEqual(p.pow(6,8), 6**8) except xmlrpclib.ProtocolError as e: - # protocol error; provide additional information in test output - self.fail("%s\n%s" % (e, e.headers)) + # ignore failures due to non-blocking socket 'unavailable' errors + if not is_unavailable_exception(e): + # protocol error; provide additional information in test output + self.fail("%s\n%s" % (e, e.headers)) def test_introspection1(self): try: @@ -325,8 +348,10 @@ class SimpleServerTestCase(unittest.TestCase): 'system.methodHelp', 'system.methodSignature', 'system.multicall']) self.assertEqual(set(meth), expected_methods) except xmlrpclib.ProtocolError as e: - # protocol error; provide additional information in test output - self.fail("%s\n%s" % (e, e.headers)) + # ignore failures due to non-blocking socket 'unavailable' errors + if not is_unavailable_exception(e): + # protocol error; provide additional information in test output + self.fail("%s\n%s" % (e, e.headers)) def test_introspection2(self): try: @@ -334,19 +359,23 @@ class SimpleServerTestCase(unittest.TestCase): divhelp = p.system.methodHelp('div') self.assertEqual(divhelp, 'This is the div function') except xmlrpclib.ProtocolError as e: - # protocol error; provide additional information in test output - self.fail("%s\n%s" % (e, e.headers)) + # ignore failures due to non-blocking socket 'unavailable' errors + if not is_unavailable_exception(e): + # protocol error; provide additional information in test output + self.fail("%s\n%s" % (e, e.headers)) def test_introspection3(self): # the SimpleXMLRPCServer doesn't support signatures, but - # at least check that we can try + # at least check that we can try making the call try: p = xmlrpclib.ServerProxy('http://localhost:%d' % PORT) divsig = p.system.methodSignature('div') self.assertEqual(divsig, 'signatures not supported') except xmlrpclib.ProtocolError as e: - # protocol error; provide additional information in test output - self.fail("%s\n%s" % (e, e.headers)) + # ignore failures due to non-blocking socket 'unavailable' errors + if not is_unavailable_exception(e): + # protocol error; provide additional information in test output + self.fail("%s\n%s" % (e, e.headers)) def test_multicall(self): try: @@ -360,8 +389,10 @@ class SimpleServerTestCase(unittest.TestCase): self.assertEqual(pow_result, 6**8) self.assertEqual(div_result, 127//42) except xmlrpclib.ProtocolError as e: - # protocol error; provide additional information in test output - self.fail("%s\n%s" % (e, e.headers)) + # ignore failures due to non-blocking socket 'unavailable' errors + if not is_unavailable_exception(e): + # protocol error; provide additional information in test output + self.fail("%s\n%s" % (e, e.headers)) # This is a contrived way to make a failure occur on the server side |