summaryrefslogtreecommitdiffstats
path: root/Lib
diff options
context:
space:
mode:
Diffstat (limited to 'Lib')
-rwxr-xr-xLib/UserString.py42
-rw-r--r--Lib/ctypes/__init__.py2
-rwxr-xr-xLib/runpy.py86
-rw-r--r--Lib/socket.py4
-rw-r--r--Lib/ssl.py253
-rw-r--r--Lib/tarfile.py2
-rw-r--r--Lib/test/keycert.pem32
-rw-r--r--Lib/test/list_tests.py2
-rw-r--r--Lib/test/output/test_class64
-rw-r--r--Lib/test/output/test_frozen4
-rw-r--r--Lib/test/output/test_ossaudiodev2
-rw-r--r--Lib/test/output/test_signal2
-rw-r--r--Lib/test/output/test_winreg3
-rw-r--r--Lib/test/string_tests.py14
-rwxr-xr-xLib/test/test_array.py34
-rw-r--r--Lib/test/test_buffer.py12
-rw-r--r--Lib/test/test_class.py811
-rw-r--r--Lib/test/test_cmd_line.py40
-rw-r--r--Lib/test/test_frozen.py59
-rw-r--r--Lib/test/test_mmap.py34
-rw-r--r--Lib/test/test_ossaudiodev.py6
-rw-r--r--Lib/test/test_pkg.py11
-rw-r--r--Lib/test/test_runpy.py66
-rw-r--r--Lib/test/test_signal.py333
-rw-r--r--Lib/test/test_ssl.py372
-rw-r--r--Lib/test/test_structmembers.py15
-rw-r--r--Lib/test/test_structseq.py12
-rw-r--r--Lib/test/test_support.py16
-rw-r--r--Lib/test/test_tarfile.py16
-rwxr-xr-xLib/test/test_userstring.py23
-rw-r--r--Lib/test/test_winreg.py266
-rw-r--r--Lib/test/test_xmlrpc.py55
32 files changed, 1892 insertions, 801 deletions
diff --git a/Lib/UserString.py b/Lib/UserString.py
index 500cd12..faad148 100755
--- a/Lib/UserString.py
+++ b/Lib/UserString.py
@@ -185,15 +185,41 @@ class MutableString(UserString):
def __hash__(self):
raise TypeError, "unhashable type (it is mutable)"
def __setitem__(self, index, sub):
- if index < 0:
- index += len(self.data)
- if index < 0 or index >= len(self.data): raise IndexError
- self.data = self.data[:index] + sub + self.data[index+1:]
+ if isinstance(index, slice):
+ if isinstance(sub, UserString):
+ sub = sub.data
+ elif not isinstance(sub, basestring):
+ sub = str(sub)
+ start, stop, step = index.indices(len(self.data))
+ if step == -1:
+ start, stop = stop+1, start+1
+ sub = sub[::-1]
+ elif step != 1:
+ # XXX(twouters): I guess we should be reimplementing
+ # the extended slice assignment/deletion algorithm here...
+ raise TypeError, "invalid step in slicing assignment"
+ start = min(start, stop)
+ self.data = self.data[:start] + sub + self.data[stop:]
+ else:
+ if index < 0:
+ index += len(self.data)
+ if index < 0 or index >= len(self.data): raise IndexError
+ self.data = self.data[:index] + sub + self.data[index+1:]
def __delitem__(self, index):
- if index < 0:
- index += len(self.data)
- if index < 0 or index >= len(self.data): raise IndexError
- self.data = self.data[:index] + self.data[index+1:]
+ if isinstance(index, slice):
+ start, stop, step = index.indices(len(self.data))
+ if step == -1:
+ start, stop = stop+1, start+1
+ elif step != 1:
+ # XXX(twouters): see same block in __setitem__
+ raise TypeError, "invalid step in slicing deletion"
+ start = min(start, stop)
+ self.data = self.data[:start] + self.data[stop:]
+ else:
+ if index < 0:
+ index += len(self.data)
+ if index < 0 or index >= len(self.data): raise IndexError
+ self.data = self.data[:index] + self.data[index+1:]
def __setslice__(self, start, end, sub):
start = max(start, 0); end = max(end, 0)
if isinstance(sub, UserString):
diff --git a/Lib/ctypes/__init__.py b/Lib/ctypes/__init__.py
index 2cddb11..cff5483 100644
--- a/Lib/ctypes/__init__.py
+++ b/Lib/ctypes/__init__.py
@@ -336,7 +336,7 @@ class CDLL(object):
<obj>['qsort'] -> callable object
Calling the functions releases the Python GIL during the call and
- reaquires it afterwards.
+ reacquires it afterwards.
"""
class _FuncPtr(_CFuncPtr):
_flags_ = _FUNCFLAG_CDECL
diff --git a/Lib/runpy.py b/Lib/runpy.py
index b463f2b..406e081 100755
--- a/Lib/runpy.py
+++ b/Lib/runpy.py
@@ -21,8 +21,9 @@ __all__ = [
]
-def _run_code(code, run_globals, init_globals,
- mod_name, mod_fname, mod_loader):
+def _run_code(code, run_globals, init_globals=None,
+ mod_name=None, mod_fname=None,
+ mod_loader=None):
"""Helper for _run_module_code"""
if init_globals is not None:
run_globals.update(init_globals)
@@ -33,21 +34,31 @@ def _run_code(code, run_globals, init_globals,
return run_globals
def _run_module_code(code, init_globals=None,
- mod_name=None, mod_fname=None,
- mod_loader=None, alter_sys=False):
+ mod_name=None, mod_fname=None,
+ mod_loader=None):
"""Helper for run_module"""
# Set up the top level namespace dictionary
- if alter_sys:
- # Modify sys.argv[0] and sys.modules[mod_name]
- sys.argv[0] = mod_fname
- module = imp.new_module(mod_name)
- sys.modules[mod_name] = module
- mod_globals = module.__dict__
- else:
- # Leave the sys module alone
- mod_globals = {}
- return _run_code(code, mod_globals, init_globals,
- mod_name, mod_fname, mod_loader)
+ temp_module = imp.new_module(mod_name)
+ mod_globals = temp_module.__dict__
+ # Modify sys.argv[0] and sys.module[mod_name]
+ saved_argv0 = sys.argv[0]
+ restore_module = mod_name in sys.modules
+ if restore_module:
+ saved_module = sys.modules[mod_name]
+ sys.argv[0] = mod_fname
+ sys.modules[mod_name] = temp_module
+ try:
+ _run_code(code, mod_globals, init_globals,
+ mod_name, mod_fname, mod_loader)
+ finally:
+ sys.argv[0] = saved_argv0
+ if restore_module:
+ sys.modules[mod_name] = saved_module
+ else:
+ del sys.modules[mod_name]
+ # Copy the globals of the temporary module, as they
+ # may be cleared when the temporary module goes away
+ return mod_globals.copy()
# This helper is needed due to a missing component in the PEP 302
@@ -60,13 +71,8 @@ def _get_filename(loader, mod_name):
else:
return get_filename(mod_name)
-
-def run_module(mod_name, init_globals=None,
- run_name=None, alter_sys=False):
- """Execute a module's code without importing it
-
- Returns the resulting top level namespace dictionary
- """
+# Helper to get the loader, code and filename for a module
+def _get_module_details(mod_name):
loader = get_loader(mod_name)
if loader is None:
raise ImportError("No module named %s" % mod_name)
@@ -77,10 +83,40 @@ def run_module(mod_name, init_globals=None,
if code is None:
raise ImportError("No code object available for %s" % mod_name)
filename = _get_filename(loader, mod_name)
+ return loader, code, filename
+
+
+# XXX ncoghlan: Should this be documented and made public?
+def _run_module_as_main(mod_name, set_argv0=True):
+ """Runs the designated module in the __main__ namespace
+
+ These __*__ magic variables will be overwritten:
+ __file__
+ __loader__
+ """
+ loader, code, fname = _get_module_details(mod_name)
+ main_globals = sys.modules["__main__"].__dict__
+ if set_argv0:
+ sys.argv[0] = fname
+ return _run_code(code, main_globals, None,
+ "__main__", fname, loader)
+
+def run_module(mod_name, init_globals=None,
+ run_name=None, alter_sys=False):
+ """Execute a module's code without importing it
+
+ Returns the resulting top level namespace dictionary
+ """
+ loader, code, fname = _get_module_details(mod_name)
if run_name is None:
run_name = mod_name
- return _run_module_code(code, init_globals, run_name,
- filename, loader, alter_sys)
+ if alter_sys:
+ return _run_module_code(code, init_globals, run_name,
+ fname, loader)
+ else:
+ # Leave the sys module alone
+ return _run_code(code, {}, init_globals,
+ run_name, fname, loader)
if __name__ == "__main__":
@@ -89,4 +125,4 @@ if __name__ == "__main__":
print("No module specified for execution", file=sys.stderr)
else:
del sys.argv[0] # Make the requested module sys.argv[0]
- run_module(sys.argv[0], run_name="__main__", alter_sys=True)
+ _run_module_as_main(sys.argv[0])
diff --git a/Lib/socket.py b/Lib/socket.py
index bffea15..fca44ea 100644
--- a/Lib/socket.py
+++ b/Lib/socket.py
@@ -65,6 +65,10 @@ __all__ = ["getfqdn"]
__all__.extend(os._get_exports_list(_socket))
if _have_ssl:
__all__.extend(os._get_exports_list(_ssl))
+ def ssl(sock, keyfile=None, certfile=None):
+ import ssl as realssl
+ return realssl.sslwrap_simple(sock, keyfile, certfile)
+ __all__.append("ssl")
# WSA error codes
if sys.platform.lower().startswith("win"):
diff --git a/Lib/ssl.py b/Lib/ssl.py
new file mode 100644
index 0000000..99f6257
--- /dev/null
+++ b/Lib/ssl.py
@@ -0,0 +1,253 @@
+# Wrapper module for _ssl, providing some additional facilities
+# implemented in Python. Written by Bill Janssen.
+
+"""\
+This module provides some more Pythonic support for SSL.
+
+Object types:
+
+ sslsocket -- subtype of socket.socket which does SSL over the socket
+
+Exceptions:
+
+ sslerror -- exception raised for I/O errors
+
+Functions:
+
+ cert_time_to_seconds -- convert time string used for certificate
+ notBefore and notAfter functions to integer
+ seconds past the Epoch (the time values
+ returned from time.time())
+
+ fetch_server_certificate (HOST, PORT) -- fetch the certificate provided
+ by the server running on HOST at port PORT. No
+ validation of the certificate is performed.
+
+Integer constants:
+
+SSL_ERROR_ZERO_RETURN
+SSL_ERROR_WANT_READ
+SSL_ERROR_WANT_WRITE
+SSL_ERROR_WANT_X509_LOOKUP
+SSL_ERROR_SYSCALL
+SSL_ERROR_SSL
+SSL_ERROR_WANT_CONNECT
+
+SSL_ERROR_EOF
+SSL_ERROR_INVALID_ERROR_CODE
+
+The following group define certificate requirements that one side is
+allowing/requiring from the other side:
+
+CERT_NONE - no certificates from the other side are required (or will
+ be looked at if provided)
+CERT_OPTIONAL - certificates are not required, but if provided will be
+ validated, and if validation fails, the connection will
+ also fail
+CERT_REQUIRED - certificates are required, and will be validated, and
+ if validation fails, the connection will also fail
+
+The following constants identify various SSL protocol variants:
+
+PROTOCOL_SSLv2
+PROTOCOL_SSLv3
+PROTOCOL_SSLv23
+PROTOCOL_TLSv1
+"""
+
+import os, sys
+
+import _ssl # if we can't import it, let the error propagate
+from socket import socket
+from _ssl import sslerror
+from _ssl import CERT_NONE, CERT_OPTIONAL, CERT_REQUIRED
+from _ssl import PROTOCOL_SSLv2, PROTOCOL_SSLv3, PROTOCOL_SSLv23, PROTOCOL_TLSv1
+
+# Root certs:
+#
+# The "ca_certs" argument to sslsocket() expects a file containing one or more
+# certificates that are roots of various certificate signing chains. This file
+# contains the certificates in PEM format (RFC ) where each certificate is
+# encoded in base64 encoding and surrounded with a header and footer:
+# -----BEGIN CERTIFICATE-----
+# ... (CA certificate in base64 encoding) ...
+# -----END CERTIFICATE-----
+# The various certificates in the file are just concatenated together:
+# -----BEGIN CERTIFICATE-----
+# ... (CA certificate in base64 encoding) ...
+# -----END CERTIFICATE-----
+# -----BEGIN CERTIFICATE-----
+# ... (a second CA certificate in base64 encoding) ...
+# -----END CERTIFICATE-----
+#
+# Some "standard" root certificates are available at
+#
+# http://www.thawte.com/roots/ (for Thawte roots)
+# http://www.verisign.com/support/roots.html (for Verisign)
+
+class sslsocket (socket):
+
+ def __init__(self, sock, keyfile=None, certfile=None,
+ server_side=False, cert_reqs=CERT_NONE,
+ ssl_version=PROTOCOL_SSLv23, ca_certs=None):
+ socket.__init__(self, _sock=sock._sock)
+ if certfile and not keyfile:
+ keyfile = certfile
+ if server_side:
+ self._sslobj = _ssl.sslwrap(self._sock, 1, keyfile, certfile,
+ cert_reqs, ssl_version, ca_certs)
+ else:
+ # see if it's connected
+ try:
+ socket.getpeername(self)
+ except:
+ # no, no connection yet
+ self._sslobj = None
+ else:
+ # yes, create the SSL object
+ self._sslobj = _ssl.sslwrap(self._sock, 0, keyfile, certfile,
+ cert_reqs, ssl_version, ca_certs)
+ self.keyfile = keyfile
+ self.certfile = certfile
+ self.cert_reqs = cert_reqs
+ self.ssl_version = ssl_version
+ self.ca_certs = ca_certs
+
+ def read(self, len=1024):
+ return self._sslobj.read(len)
+
+ def write(self, data):
+ return self._sslobj.write(data)
+
+ def getpeercert(self):
+ return self._sslobj.peer_certificate()
+
+ def send (self, data, flags=0):
+ if flags != 0:
+ raise ValueError(
+ "non-zero flags not allowed in calls to send() on %s" %
+ self.__class__)
+ return self._sslobj.write(data)
+
+ def send_to (self, data, addr, flags=0):
+ raise ValueError("send_to not allowed on instances of %s" %
+ self.__class__)
+
+ def sendall (self, data, flags=0):
+ if flags != 0:
+ raise ValueError(
+ "non-zero flags not allowed in calls to sendall() on %s" %
+ self.__class__)
+ return self._sslobj.write(data)
+
+ def recv (self, buflen=1024, flags=0):
+ if flags != 0:
+ raise ValueError(
+ "non-zero flags not allowed in calls to sendall() on %s" %
+ self.__class__)
+ return self._sslobj.read(data, buflen)
+
+ def recv_from (self, addr, buflen=1024, flags=0):
+ raise ValueError("recv_from not allowed on instances of %s" %
+ self.__class__)
+
+ def shutdown(self):
+ if self._sslobj:
+ self._sslobj.shutdown()
+ self._sslobj = None
+ else:
+ socket.shutdown(self)
+
+ def close(self):
+ if self._sslobj:
+ self.shutdown()
+ else:
+ socket.close(self)
+
+ def connect(self, addr):
+ # Here we assume that the socket is client-side, and not
+ # connected at the time of the call. We connect it, then wrap it.
+ if self._sslobj or (self.getsockname()[1] != 0):
+ raise ValueError("attempt to connect already-connected sslsocket!")
+ socket.connect(self, addr)
+ self._sslobj = _ssl.sslwrap(self._sock, 0, self.keyfile, self.certfile,
+ self.cert_reqs, self.ssl_version,
+ self.ca_certs)
+
+ def accept(self):
+ raise ValueError("accept() not supported on an sslsocket")
+
+
+# some utility functions
+
+def cert_time_to_seconds(cert_time):
+ import time
+ return time.mktime(time.strptime(cert_time, "%b %d %H:%M:%S %Y GMT"))
+
+# a replacement for the old socket.ssl function
+
+def sslwrap_simple (sock, keyfile=None, certfile=None):
+
+ return _ssl.sslwrap(sock._sock, 0, keyfile, certfile, CERT_NONE,
+ PROTOCOL_SSLv23, None)
+
+# fetch the certificate that the server is providing in PEM form
+
+def fetch_server_certificate (host, port):
+
+ import re, tempfile, os
+
+ def subproc(cmd):
+ from subprocess import Popen, PIPE, STDOUT
+ proc = Popen(cmd, stdout=PIPE, stderr=STDOUT, shell=True)
+ status = proc.wait()
+ output = proc.stdout.read()
+ return status, output
+
+ def strip_to_x509_cert(certfile_contents, outfile=None):
+ m = re.search(r"^([-]+BEGIN CERTIFICATE[-]+[\r]*\n"
+ r".*[\r]*^[-]+END CERTIFICATE[-]+)$",
+ certfile_contents, re.MULTILINE | re.DOTALL)
+ if not m:
+ return None
+ else:
+ tn = tempfile.mktemp()
+ fp = open(tn, "w")
+ fp.write(m.group(1) + "\n")
+ fp.close()
+ try:
+ tn2 = (outfile or tempfile.mktemp())
+ status, output = subproc(r'openssl x509 -in "%s" -out "%s"' %
+ (tn, tn2))
+ if status != 0:
+ raise OperationError(status, tsig, output)
+ fp = open(tn2, 'rb')
+ data = fp.read()
+ fp.close()
+ os.unlink(tn2)
+ return data
+ finally:
+ os.unlink(tn)
+
+ if sys.platform.startswith("win"):
+ tfile = tempfile.mktemp()
+ fp = open(tfile, "w")
+ fp.write("quit\n")
+ fp.close()
+ try:
+ status, output = subproc(
+ 'openssl s_client -connect "%s:%s" -showcerts < "%s"' %
+ (host, port, tfile))
+ finally:
+ os.unlink(tfile)
+ else:
+ status, output = subproc(
+ 'openssl s_client -connect "%s:%s" -showcerts < /dev/null' %
+ (host, port))
+ if status != 0:
+ raise OSError(status)
+ certtext = strip_to_x509_cert(output)
+ if not certtext:
+ raise ValueError("Invalid response received from server at %s:%s" %
+ (host, port))
+ return certtext
diff --git a/Lib/tarfile.py b/Lib/tarfile.py
index 98b774a..ffe711a 100644
--- a/Lib/tarfile.py
+++ b/Lib/tarfile.py
@@ -1508,7 +1508,7 @@ class TarFile(object):
if hasattr(fileobj, "mode"):
self._mode = fileobj.mode
self._extfileobj = True
- self.name = os.path.abspath(name)
+ self.name = os.path.abspath(name) if name else None
self.fileobj = fileobj
# Init attributes.
diff --git a/Lib/test/keycert.pem b/Lib/test/keycert.pem
new file mode 100644
index 0000000..2f46fcf
--- /dev/null
+++ b/Lib/test/keycert.pem
@@ -0,0 +1,32 @@
+-----BEGIN RSA PRIVATE KEY-----
+MIICXwIBAAKBgQC8ddrhm+LutBvjYcQlnH21PPIseJ1JVG2HMmN2CmZk2YukO+9L
+opdJhTvbGfEj0DQs1IE8M+kTUyOmuKfVrFMKwtVeCJphrAnhoz7TYOuLBSqt7lVH
+fhi/VwovESJlaBOp+WMnfhcduPEYHYx/6cnVapIkZnLt30zu2um+DzA9jQIDAQAB
+AoGBAK0FZpaKj6WnJZN0RqhhK+ggtBWwBnc0U/ozgKz2j1s3fsShYeiGtW6CK5nU
+D1dZ5wzhbGThI7LiOXDvRucc9n7vUgi0alqPQ/PFodPxAN/eEYkmXQ7W2k7zwsDA
+IUK0KUhktQbLu8qF/m8qM86ba9y9/9YkXuQbZ3COl5ahTZrhAkEA301P08RKv3KM
+oXnGU2UHTuJ1MAD2hOrPxjD4/wxA/39EWG9bZczbJyggB4RHu0I3NOSFjAm3HQm0
+ANOu5QK9owJBANgOeLfNNcF4pp+UikRFqxk5hULqRAWzVxVrWe85FlPm0VVmHbb/
+loif7mqjU8o1jTd/LM7RD9f2usZyE2psaw8CQQCNLhkpX3KO5kKJmS9N7JMZSc4j
+oog58yeYO8BBqKKzpug0LXuQultYv2K4veaIO04iL9VLe5z9S/Q1jaCHBBuXAkEA
+z8gjGoi1AOp6PBBLZNsncCvcV/0aC+1se4HxTNo2+duKSDnbq+ljqOM+E7odU+Nq
+ewvIWOG//e8fssd0mq3HywJBAJ8l/c8GVmrpFTx8r/nZ2Pyyjt3dH1widooDXYSV
+q6Gbf41Llo5sYAtmxdndTLASuHKecacTgZVhy0FryZpLKrU=
+-----END RSA PRIVATE KEY-----
+-----BEGIN CERTIFICATE-----
+MIICpzCCAhCgAwIBAgIJAP+qStv1cIGNMA0GCSqGSIb3DQEBBQUAMIGJMQswCQYD
+VQQGEwJVUzERMA8GA1UECBMIRGVsYXdhcmUxEzARBgNVBAcTCldpbG1pbmd0b24x
+IzAhBgNVBAoTGlB5dGhvbiBTb2Z0d2FyZSBGb3VuZGF0aW9uMQwwCgYDVQQLEwNT
+U0wxHzAdBgNVBAMTFnNvbWVtYWNoaW5lLnB5dGhvbi5vcmcwHhcNMDcwODI3MTY1
+NDUwWhcNMTMwMjE2MTY1NDUwWjCBiTELMAkGA1UEBhMCVVMxETAPBgNVBAgTCERl
+bGF3YXJlMRMwEQYDVQQHEwpXaWxtaW5ndG9uMSMwIQYDVQQKExpQeXRob24gU29m
+dHdhcmUgRm91bmRhdGlvbjEMMAoGA1UECxMDU1NMMR8wHQYDVQQDExZzb21lbWFj
+aGluZS5weXRob24ub3JnMIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQC8ddrh
+m+LutBvjYcQlnH21PPIseJ1JVG2HMmN2CmZk2YukO+9LopdJhTvbGfEj0DQs1IE8
+M+kTUyOmuKfVrFMKwtVeCJphrAnhoz7TYOuLBSqt7lVHfhi/VwovESJlaBOp+WMn
+fhcduPEYHYx/6cnVapIkZnLt30zu2um+DzA9jQIDAQABoxUwEzARBglghkgBhvhC
+AQEEBAMCBkAwDQYJKoZIhvcNAQEFBQADgYEAF4Q5BVqmCOLv1n8je/Jw9K669VXb
+08hyGzQhkemEBYQd6fzQ9A/1ZzHkJKb1P6yreOLSEh4KcxYPyrLRC1ll8nr5OlCx
+CMhKkTnR6qBsdNV0XtdU2+N25hqW+Ma4ZeqsN/iiJVCGNOZGnvQuvCAGWF8+J/f/
+iHkC6gGdBJhogs4=
+-----END CERTIFICATE-----
diff --git a/Lib/test/list_tests.py b/Lib/test/list_tests.py
index 0d893f2..a8a7e76 100644
--- a/Lib/test/list_tests.py
+++ b/Lib/test/list_tests.py
@@ -179,8 +179,10 @@ class CommonTest(seq_tests.CommonTest):
self.assertEqual(a, self.type2test(range(10)))
self.assertRaises(TypeError, a.__setslice__, 0, 1, 5)
+ self.assertRaises(TypeError, a.__setitem__, slice(0, 1, 5))
self.assertRaises(TypeError, a.__setslice__)
+ self.assertRaises(TypeError, a.__setitem__)
def test_delslice(self):
a = self.type2test([0, 1])
diff --git a/Lib/test/output/test_class b/Lib/test/output/test_class
deleted file mode 100644
index d0fed75..0000000
--- a/Lib/test/output/test_class
+++ /dev/null
@@ -1,64 +0,0 @@
-test_class
-__init__: ()
-__add__: (1,)
-__radd__: (1,)
-__sub__: (1,)
-__rsub__: (1,)
-__mul__: (1,)
-__rmul__: (1,)
-__truediv__: (1,)
-__rtruediv__: (1,)
-__mod__: (1,)
-__rmod__: (1,)
-__divmod__: (1,)
-__rdivmod__: (1,)
-__pow__: (1,)
-__rpow__: (1,)
-__rshift__: (1,)
-__rrshift__: (1,)
-__lshift__: (1,)
-__rlshift__: (1,)
-__and__: (1,)
-__rand__: (1,)
-__or__: (1,)
-__ror__: (1,)
-__xor__: (1,)
-__rxor__: (1,)
-__contains__: (1,)
-__getitem__: (1,)
-__setitem__: (1, 1)
-__delitem__: (1,)
-__getslice__: (0, 42)
-__setslice__: (0, 42, 'The Answer')
-__delslice__: (0, 42)
-__getitem__: (slice(2, 1024, 10),)
-__setitem__: (slice(2, 1024, 10), 'A lot')
-__delitem__: (slice(2, 1024, 10),)
-__getitem__: ((slice(None, 42, None), Ellipsis, slice(None, 24, None), 24, 100),)
-__setitem__: ((slice(None, 42, None), Ellipsis, slice(None, 24, None), 24, 100), 'Strange')
-__delitem__: ((slice(None, 42, None), Ellipsis, slice(None, 24, None), 24, 100),)
-__getitem__: (slice(None, 42, None),)
-__setitem__: (slice(None, 42, None), 'The Answer')
-__delitem__: (slice(None, 42, None),)
-__neg__: ()
-__pos__: ()
-__abs__: ()
-__int__: ()
-__int__: ()
-__float__: ()
-__index__: ()
-__hash__: ()
-__repr__: ()
-__str__: ()
-__eq__: (1,)
-__lt__: (1,)
-__gt__: (1,)
-__ne__: (1,)
-__eq__: (1,)
-__gt__: (1,)
-__lt__: (1,)
-__ne__: (1,)
-__del__: ()
-__getattr__: ('spam',)
-__setattr__: ('eggs', 'spam, spam, spam and ham')
-__delattr__: ('cardinal',)
diff --git a/Lib/test/output/test_frozen b/Lib/test/output/test_frozen
deleted file mode 100644
index 76f17db..0000000
--- a/Lib/test/output/test_frozen
+++ /dev/null
@@ -1,4 +0,0 @@
-test_frozen
-Hello world...
-Hello world...
-Hello world...
diff --git a/Lib/test/output/test_ossaudiodev b/Lib/test/output/test_ossaudiodev
deleted file mode 100644
index f0df5d2..0000000
--- a/Lib/test/output/test_ossaudiodev
+++ /dev/null
@@ -1,2 +0,0 @@
-test_ossaudiodev
-playing test sound file (expected running time: 2.93 sec)
diff --git a/Lib/test/output/test_signal b/Lib/test/output/test_signal
deleted file mode 100644
index aa64689..0000000
--- a/Lib/test/output/test_signal
+++ /dev/null
@@ -1,2 +0,0 @@
-test_signal
-starting pause() loop...
diff --git a/Lib/test/output/test_winreg b/Lib/test/output/test_winreg
deleted file mode 100644
index f47aa84..0000000
--- a/Lib/test/output/test_winreg
+++ /dev/null
@@ -1,3 +0,0 @@
-test_winreg
-Local registry tests worked
-Remote registry calls can be tested using 'test_winreg.py --remote \\machine_name'
diff --git a/Lib/test/string_tests.py b/Lib/test/string_tests.py
index bafa23b..916a98f 100644
--- a/Lib/test/string_tests.py
+++ b/Lib/test/string_tests.py
@@ -935,7 +935,6 @@ class MixinStrUnicodeUserStringTest:
self.checkequal('abc', 'abc', '__getitem__', slice(0, 1000))
self.checkequal('a', 'abc', '__getitem__', slice(0, 1))
self.checkequal('', 'abc', '__getitem__', slice(0, 0))
- # FIXME What about negative indices? This is handled differently by [] and __getitem__(slice)
self.checkraises(TypeError, 'abc', '__getitem__', 'def')
@@ -949,10 +948,21 @@ class MixinStrUnicodeUserStringTest:
self.checkequal('', 'abc', '__getslice__', 1000, 1000)
self.checkequal('', 'abc', '__getslice__', 2000, 1000)
self.checkequal('', 'abc', '__getslice__', 2, 1)
- # FIXME What about negative indizes? This is handled differently by [] and __getslice__
self.checkraises(TypeError, 'abc', '__getslice__', 'def')
+ def test_extended_getslice(self):
+ # Test extended slicing by comparing with list slicing.
+ s = string.ascii_letters + string.digits
+ indices = (0, None, 1, 3, 41, -1, -2, -37)
+ for start in indices:
+ for stop in indices:
+ # Skip step 0 (invalid)
+ for step in indices[1:]:
+ L = list(s)[start:stop:step]
+ self.checkequal("".join(L), s, '__getitem__',
+ slice(start, stop, step))
+
def test_mul(self):
self.checkequal('', 'abc', '__mul__', -1)
self.checkequal('', 'abc', '__mul__', 0)
diff --git a/Lib/test/test_array.py b/Lib/test/test_array.py
index cf5c2e8..bae496e 100755
--- a/Lib/test/test_array.py
+++ b/Lib/test/test_array.py
@@ -468,6 +468,18 @@ class BaseTest(unittest.TestCase):
array.array(self.typecode)
)
+ def test_extended_getslice(self):
+ # Test extended slicing by comparing with list slicing
+ # (Assumes list conversion works correctly, too)
+ a = array.array(self.typecode, self.example)
+ indices = (0, None, 1, 3, 19, 100, -1, -2, -31, -100)
+ for start in indices:
+ for stop in indices:
+ # Everything except the initial 0 (invalid step)
+ for step in indices[1:]:
+ self.assertEqual(list(a[start:stop:step]),
+ list(a)[start:stop:step])
+
def test_setslice(self):
a = array.array(self.typecode, self.example)
a[:1] = a
@@ -551,12 +563,34 @@ class BaseTest(unittest.TestCase):
a = array.array(self.typecode, self.example)
self.assertRaises(TypeError, a.__setslice__, 0, 0, None)
+ self.assertRaises(TypeError, a.__setitem__, slice(0, 0), None)
self.assertRaises(TypeError, a.__setitem__, slice(0, 1), None)
b = array.array(self.badtypecode())
self.assertRaises(TypeError, a.__setslice__, 0, 0, b)
+ self.assertRaises(TypeError, a.__setitem__, slice(0, 0), b)
self.assertRaises(TypeError, a.__setitem__, slice(0, 1), b)
+ def test_extended_set_del_slice(self):
+ indices = (0, None, 1, 3, 19, 100, -1, -2, -31, -100)
+ for start in indices:
+ for stop in indices:
+ # Everything except the initial 0 (invalid step)
+ for step in indices[1:]:
+ a = array.array(self.typecode, self.example)
+ L = list(a)
+ # Make sure we have a slice of exactly the right length,
+ # but with (hopefully) different data.
+ data = L[start:stop:step]
+ data.reverse()
+ L[start:stop:step] = data
+ a[start:stop:step] = array.array(self.typecode, data)
+ self.assertEquals(a, array.array(self.typecode, L))
+
+ del L[start:stop:step]
+ del a[start:stop:step]
+ self.assertEquals(a, array.array(self.typecode, L))
+
def test_index(self):
example = 2*self.example
a = array.array(self.typecode, example)
diff --git a/Lib/test/test_buffer.py b/Lib/test/test_buffer.py
index eb6e9ea..834c05b 100644
--- a/Lib/test/test_buffer.py
+++ b/Lib/test/test_buffer.py
@@ -37,6 +37,18 @@ class BufferTests(unittest.TestCase):
self.failIf(a == b)
self.assertRaises(TypeError, lambda: a < b)
+ def test_extended_getslice(self):
+ # Test extended slicing by comparing with list slicing.
+ s = bytes(range(255, -1, -1))
+ b = buffer(s)
+ indices = (0, None, 1, 3, 19, 300, -1, -2, -31, -300)
+ for start in indices:
+ for stop in indices:
+ # Skip step 0 (invalid)
+ for step in indices[1:]:
+ self.assertEqual(b[start:stop:step],
+ s[start:stop:step])
+
def test_main():
test_support.run_unittest(BufferTests)
diff --git a/Lib/test/test_class.py b/Lib/test/test_class.py
index 003b4a5..bde63a8 100644
--- a/Lib/test/test_class.py
+++ b/Lib/test/test_class.py
@@ -1,6 +1,9 @@
"Test the functionality of Python classes implementing operators."
-from test.test_support import TestFailed
+import unittest
+import sys
+
+from test import test_support
testmeths = [
@@ -53,340 +56,540 @@ testmeths = [
# "str",
# "repr",
# "int",
-# "long",
# "float",
-# "oct",
-# "hex",
# These are separate because they can influence the test of other methods.
# "getattr",
# "setattr",
# "delattr",
-class AllTests:
- def __hash__(self, *args):
- print("__hash__:", args)
- return hash(id(self))
-
- def __str__(self, *args):
- print("__str__:", args)
- return "AllTests"
-
- def __repr__(self, *args):
- print("__repr__:", args)
- return "AllTests"
-
- def __int__(self, *args):
- print("__int__:", args)
- return 1
-
- def __index__(self, *args):
- print("__index__:", args)
- return 1
-
- def __float__(self, *args):
- print("__float__:", args)
- return 1.0
-
- def __cmp__(self, *args):
- print("__cmp__:", args)
- return 0
-
- def __eq__(self, *args):
- print("__eq__:", args)
- return True
-
- def __ne__(self, *args):
- print("__ne__:", args)
- return False
-
- def __lt__(self, *args):
- print("__lt__:", args)
- return False
-
- def __le__(self, *args):
- print("__le__:", args)
- return True
-
- def __gt__(self, *args):
- print("__gt__:", args)
- return False
-
- def __ge__(self, *args):
- print("__ge__:", args)
- return True
-
- def __del__(self, *args):
- print("__del__:", args)
+callLst = []
+def trackCall(f):
+ def track(*args, **kwargs):
+ callLst.append((f.__name__, args))
+ return f(*args, **kwargs)
+ return track
+
+statictests = """
+@trackCall
+def __hash__(self, *args):
+ return hash(id(self))
+
+@trackCall
+def __str__(self, *args):
+ return "AllTests"
+
+@trackCall
+def __repr__(self, *args):
+ return "AllTests"
+
+@trackCall
+def __int__(self, *args):
+ return 1
+
+@trackCall
+def __index__(self, *args):
+ return 1
+
+@trackCall
+def __float__(self, *args):
+ return 1.0
+
+@trackCall
+def __cmp__(self, *args):
+ return 0
+
+@trackCall
+def __eq__(self, *args):
+ return True
+
+@trackCall
+def __ne__(self, *args):
+ return False
+
+@trackCall
+def __lt__(self, *args):
+ return False
+
+@trackCall
+def __le__(self, *args):
+ return True
+
+@trackCall
+def __gt__(self, *args):
+ return False
+
+@trackCall
+def __ge__(self, *args):
+ return True
+"""
-# Synthesize AllTests methods from the names in testmeths.
+# Synthesize all the other AllTests methods from the names in testmeths.
method_template = """\
-def __%(method)s__(self, *args):
- print("__%(method)s__:", args)
+@trackCall
+def __%s__(self, *args):
+ pass
"""
d = {}
+exec(statictests, globals(), d)
for method in testmeths:
- exec(method_template % locals(), d)
-for k in d:
- setattr(AllTests, k, d[k])
-del d, k
-del method, method_template
-
-# this also tests __init__ of course.
-testme = AllTests()
-
-# Binary operations
-
-testme + 1
-1 + testme
-
-testme - 1
-1 - testme
-
-testme * 1
-1 * testme
-
-testme / 1
-1 / testme
-
-testme % 1
-1 % testme
+ exec(method_template % method, globals(), d)
+AllTests = type("AllTests", (object,), d)
+del d, statictests, method, method_template
-divmod(testme,1)
-divmod(1, testme)
+class ClassTests(unittest.TestCase):
+ def setUp(self):
+ callLst[:] = []
-testme ** 1
-1 ** testme
+ def assertCallStack(self, expected_calls):
+ actualCallList = callLst[:] # need to copy because the comparison below will add
+ # additional calls to callLst
+ if expected_calls != actualCallList:
+ self.fail("Expected call list:\n %s\ndoes not match actual call list\n %s" %
+ (expected_calls, actualCallList))
-testme >> 1
-1 >> testme
+ def testInit(self):
+ foo = AllTests()
+ self.assertCallStack([("__init__", (foo,))])
-testme << 1
-1 << testme
+ def testBinaryOps(self):
+ testme = AllTests()
+ # Binary operations
-testme & 1
-1 & testme
+ callLst[:] = []
+ testme + 1
+ self.assertCallStack([("__add__", (testme, 1))])
-testme | 1
-1 | testme
+ callLst[:] = []
+ 1 + testme
+ self.assertCallStack([("__radd__", (testme, 1))])
-testme ^ 1
-1 ^ testme
+ callLst[:] = []
+ testme - 1
+ self.assertCallStack([("__sub__", (testme, 1))])
+ callLst[:] = []
+ 1 - testme
+ self.assertCallStack([("__rsub__", (testme, 1))])
-# List/dict operations
-
-class Empty: pass
-
-try:
- 1 in Empty()
- print('failed, should have raised TypeError')
-except TypeError:
- pass
-
-1 in testme
-
-testme[1]
-testme[1] = 1
-del testme[1]
-
-testme[:42]
-testme[:42] = "The Answer"
-del testme[:42]
+ callLst[:] = []
+ testme * 1
+ self.assertCallStack([("__mul__", (testme, 1))])
-testme[2:1024:10]
-testme[2:1024:10] = "A lot"
-del testme[2:1024:10]
+ callLst[:] = []
+ 1 * testme
+ self.assertCallStack([("__rmul__", (testme, 1))])
-testme[:42, ..., :24:, 24, 100]
-testme[:42, ..., :24:, 24, 100] = "Strange"
-del testme[:42, ..., :24:, 24, 100]
+ if 1/2 == 0:
+ callLst[:] = []
+ testme / 1
+ self.assertCallStack([("__div__", (testme, 1))])
-# Now remove the slice hooks to see if converting normal slices to slice
-# object works.
+ callLst[:] = []
+ 1 / testme
+ self.assertCallStack([("__rdiv__", (testme, 1))])
-del AllTests.__getslice__
-del AllTests.__setslice__
-del AllTests.__delslice__
-
-import sys
-if sys.platform[:4] != 'java':
- testme[:42]
- testme[:42] = "The Answer"
- del testme[:42]
-else:
- # This works under Jython, but the actual slice values are
- # different.
- print("__getitem__: (slice(0, 42, None),)")
- print("__setitem__: (slice(0, 42, None), 'The Answer')")
- print("__delitem__: (slice(0, 42, None),)")
-
-# Unary operations
-
--testme
-+testme
-abs(testme)
-int(testme)
-int(testme)
-float(testme)
-oct(testme)
-
-# And the rest...
-
-hash(testme)
-repr(testme)
-str(testme)
-
-testme == 1
-testme < 1
-testme > 1
-testme != 1
-1 == testme
-1 < testme
-1 > testme
-1 != testme
-
-# This test has to be last (duh.)
-
-del testme
-if sys.platform[:4] == 'java':
- import java
- java.lang.System.gc()
-
-# Interfering tests
-
-class ExtraTests:
- def __getattr__(self, *args):
- print("__getattr__:", args)
- return "SomeVal"
-
- def __setattr__(self, *args):
- print("__setattr__:", args)
-
- def __delattr__(self, *args):
- print("__delattr__:", args)
-
-testme = ExtraTests()
-testme.spam
-testme.eggs = "spam, spam, spam and ham"
-del testme.cardinal
-
-
-# return values of some method are type-checked
-class BadTypeClass:
- def __int__(self):
- return None
- __float__ = __int__
- __str__ = __int__
- __repr__ = __int__
-
-def check_exc(stmt, exception):
- """Raise TestFailed if executing 'stmt' does not raise 'exception'
- """
- try:
- exec(stmt)
- except exception:
- pass
- else:
- raise TestFailed, "%s should raise %s" % (stmt, exception)
-
-check_exc("int(BadTypeClass())", TypeError)
-check_exc("float(BadTypeClass())", TypeError)
-check_exc("str(BadTypeClass())", TypeError)
-check_exc("repr(BadTypeClass())", TypeError)
-check_exc("oct(BadTypeClass())", TypeError)
-check_exc("hex(BadTypeClass())", TypeError)
-
-# Test correct errors from hash() on objects with comparisons but no __hash__
-
-class C0:
- pass
+ callLst[:] = []
+ testme % 1
+ self.assertCallStack([("__mod__", (testme, 1))])
-hash(C0()) # This should work; the next two should raise TypeError
+ callLst[:] = []
+ 1 % testme
+ self.assertCallStack([("__rmod__", (testme, 1))])
-class C1:
- def __cmp__(self, other): return 0
-check_exc("hash(C1())", TypeError)
+ callLst[:] = []
+ divmod(testme,1)
+ self.assertCallStack([("__divmod__", (testme, 1))])
-class C2:
- def __eq__(self, other): return 1
+ callLst[:] = []
+ divmod(1, testme)
+ self.assertCallStack([("__rdivmod__", (testme, 1))])
-check_exc("hash(C2())", TypeError)
-
-# Test for SF bug 532646
-
-class A:
- pass
-A.__call__ = A()
-a = A()
-try:
- a() # This should not segfault
-except RuntimeError:
- pass
-else:
- raise TestFailed, "how could this not have overflowed the stack?"
-
-
-# Tests for exceptions raised in instance_getattr2().
-
-def booh(self):
- raise AttributeError, "booh"
-
-class A:
- a = property(booh)
-try:
- A().a # Raised AttributeError: A instance has no attribute 'a'
-except AttributeError as x:
- if str(x) != "booh":
- print("attribute error for A().a got masked:", str(x))
-
-class E:
- __eq__ = property(booh)
-E() == E() # In debug mode, caused a C-level assert() to fail
-
-class I:
- __init__ = property(booh)
-try:
- I() # In debug mode, printed XXX undetected error and raises AttributeError
-except AttributeError as x:
- pass
-else:
- print("attribute error for I.__init__ got masked")
-
-
-# Test comparison and hash of methods
-class A:
- def __init__(self, x):
- self.x = x
- def f(self):
- pass
- def g(self):
- pass
- def __eq__(self, other):
- return self.x == other.x
- def __hash__(self):
- return self.x
-class B(A):
- pass
+ callLst[:] = []
+ testme ** 1
+ self.assertCallStack([("__pow__", (testme, 1))])
-a1 = A(1)
-a2 = A(2)
-assert a1.f == a1.f
-assert a1.f != a2.f
-assert a1.f != a1.g
-assert a1.f == A(1).f
-assert hash(a1.f) == hash(a1.f)
-assert hash(a1.f) == hash(A(1).f)
-
-assert A.f != a1.f
-assert A.f != A.g
-assert B.f == A.f
-assert hash(B.f) == hash(A.f)
-
-# the following triggers a SystemError in 2.4
-a = A(hash(A.f.im_func)^(-1))
-hash(a.f)
+ callLst[:] = []
+ 1 ** testme
+ self.assertCallStack([("__rpow__", (testme, 1))])
+
+ callLst[:] = []
+ testme >> 1
+ self.assertCallStack([("__rshift__", (testme, 1))])
+
+ callLst[:] = []
+ 1 >> testme
+ self.assertCallStack([("__rrshift__", (testme, 1))])
+
+ callLst[:] = []
+ testme << 1
+ self.assertCallStack([("__lshift__", (testme, 1))])
+
+ callLst[:] = []
+ 1 << testme
+ self.assertCallStack([("__rlshift__", (testme, 1))])
+
+ callLst[:] = []
+ testme & 1
+ self.assertCallStack([("__and__", (testme, 1))])
+
+ callLst[:] = []
+ 1 & testme
+ self.assertCallStack([("__rand__", (testme, 1))])
+
+ callLst[:] = []
+ testme | 1
+ self.assertCallStack([("__or__", (testme, 1))])
+
+ callLst[:] = []
+ 1 | testme
+ self.assertCallStack([("__ror__", (testme, 1))])
+
+ callLst[:] = []
+ testme ^ 1
+ self.assertCallStack([("__xor__", (testme, 1))])
+
+ callLst[:] = []
+ 1 ^ testme
+ self.assertCallStack([("__rxor__", (testme, 1))])
+
+ def testListAndDictOps(self):
+ testme = AllTests()
+
+ # List/dict operations
+
+ class Empty: pass
+
+ try:
+ 1 in Empty()
+ self.fail('failed, should have raised TypeError')
+ except TypeError:
+ pass
+
+ callLst[:] = []
+ 1 in testme
+ self.assertCallStack([('__contains__', (testme, 1))])
+
+ callLst[:] = []
+ testme[1]
+ self.assertCallStack([('__getitem__', (testme, 1))])
+
+ callLst[:] = []
+ testme[1] = 1
+ self.assertCallStack([('__setitem__', (testme, 1, 1))])
+
+ callLst[:] = []
+ del testme[1]
+ self.assertCallStack([('__delitem__', (testme, 1))])
+
+ callLst[:] = []
+ testme[:42]
+ self.assertCallStack([('__getslice__', (testme, 0, 42))])
+
+ callLst[:] = []
+ testme[:42] = "The Answer"
+ self.assertCallStack([('__setslice__', (testme, 0, 42, "The Answer"))])
+
+ callLst[:] = []
+ del testme[:42]
+ self.assertCallStack([('__delslice__', (testme, 0, 42))])
+
+ callLst[:] = []
+ testme[2:1024:10]
+ self.assertCallStack([('__getitem__', (testme, slice(2, 1024, 10)))])
+
+ callLst[:] = []
+ testme[2:1024:10] = "A lot"
+ self.assertCallStack([('__setitem__', (testme, slice(2, 1024, 10),
+ "A lot"))])
+ callLst[:] = []
+ del testme[2:1024:10]
+ self.assertCallStack([('__delitem__', (testme, slice(2, 1024, 10)))])
+
+ callLst[:] = []
+ testme[:42, ..., :24:, 24, 100]
+ self.assertCallStack([('__getitem__', (testme, (slice(None, 42, None),
+ Ellipsis,
+ slice(None, 24, None),
+ 24, 100)))])
+ callLst[:] = []
+ testme[:42, ..., :24:, 24, 100] = "Strange"
+ self.assertCallStack([('__setitem__', (testme, (slice(None, 42, None),
+ Ellipsis,
+ slice(None, 24, None),
+ 24, 100), "Strange"))])
+ callLst[:] = []
+ del testme[:42, ..., :24:, 24, 100]
+ self.assertCallStack([('__delitem__', (testme, (slice(None, 42, None),
+ Ellipsis,
+ slice(None, 24, None),
+ 24, 100)))])
+
+ # Now remove the slice hooks to see if converting normal slices to
+ # slice object works.
+
+ getslice = AllTests.__getslice__
+ del AllTests.__getslice__
+ setslice = AllTests.__setslice__
+ del AllTests.__setslice__
+ delslice = AllTests.__delslice__
+ del AllTests.__delslice__
+
+ # XXX when using new-style classes the slice testme[:42] produces
+ # slice(None, 42, None) instead of slice(0, 42, None). py3k will have
+ # to change this test.
+ callLst[:] = []
+ testme[0:42]
+ self.assertCallStack([('__getitem__', (testme, slice(0, 42, None)))])
+
+ callLst[:] = []
+ testme[:42] = "The Answer"
+ self.assertCallStack([('__setitem__', (testme, slice(None, 42, None),
+ "The Answer"))])
+ callLst[:] = []
+ del testme[0:42]
+ self.assertCallStack([('__delitem__', (testme, slice(0, 42, None)))])
+
+ # Restore the slice methods, or the tests will fail with regrtest -R.
+ AllTests.__getslice__ = getslice
+ AllTests.__setslice__ = setslice
+ AllTests.__delslice__ = delslice
+
+
+ def testUnaryOps(self):
+ testme = AllTests()
+
+ callLst[:] = []
+ -testme
+ self.assertCallStack([('__neg__', (testme,))])
+ callLst[:] = []
+ +testme
+ self.assertCallStack([('__pos__', (testme,))])
+ callLst[:] = []
+ abs(testme)
+ self.assertCallStack([('__abs__', (testme,))])
+ callLst[:] = []
+ int(testme)
+ self.assertCallStack([('__int__', (testme,))])
+ callLst[:] = []
+ float(testme)
+ self.assertCallStack([('__float__', (testme,))])
+ callLst[:] = []
+ oct(testme)
+ self.assertCallStack([('__index__', (testme,))])
+ callLst[:] = []
+ hex(testme)
+ self.assertCallStack([('__index__', (testme,))])
+
+
+ def testMisc(self):
+ testme = AllTests()
+
+ callLst[:] = []
+ hash(testme)
+ self.assertCallStack([('__hash__', (testme,))])
+
+ callLst[:] = []
+ repr(testme)
+ self.assertCallStack([('__repr__', (testme,))])
+
+ callLst[:] = []
+ str(testme)
+ self.assertCallStack([('__str__', (testme,))])
+
+ callLst[:] = []
+ testme == 1
+ self.assertCallStack([('__eq__', (testme, 1))])
+
+ callLst[:] = []
+ testme < 1
+ self.assertCallStack([('__lt__', (testme, 1))])
+
+ callLst[:] = []
+ testme > 1
+ self.assertCallStack([('__gt__', (testme, 1))])
+
+ callLst[:] = []
+ testme != 1
+ self.assertCallStack([('__ne__', (testme, 1))])
+
+ callLst[:] = []
+ 1 == testme
+ self.assertCallStack([('__eq__', (1, testme))])
+
+ callLst[:] = []
+ 1 < testme
+ self.assertCallStack([('__gt__', (1, testme))])
+
+ callLst[:] = []
+ 1 > testme
+ self.assertCallStack([('__lt__', (1, testme))])
+
+ callLst[:] = []
+ 1 != testme
+ self.assertCallStack([('__ne__', (1, testme))])
+
+
+ def testGetSetAndDel(self):
+ # Interfering tests
+ class ExtraTests(AllTests):
+ @trackCall
+ def __getattr__(self, *args):
+ return "SomeVal"
+
+ @trackCall
+ def __setattr__(self, *args):
+ pass
+
+ @trackCall
+ def __delattr__(self, *args):
+ pass
+
+ testme = ExtraTests()
+
+ callLst[:] = []
+ testme.spam
+ self.assertCallStack([('__getattr__', (testme, "spam"))])
+
+ callLst[:] = []
+ testme.eggs = "spam, spam, spam and ham"
+ self.assertCallStack([('__setattr__', (testme, "eggs",
+ "spam, spam, spam and ham"))])
+
+ callLst[:] = []
+ del testme.cardinal
+ self.assertCallStack([('__delattr__', (testme, "cardinal"))])
+
+ def testDel(self):
+ x = []
+
+ class DelTest:
+ def __del__(self):
+ x.append("crab people, crab people")
+ testme = DelTest()
+ del testme
+ import gc
+ gc.collect()
+ self.assertEquals(["crab people, crab people"], x)
+
+ def testBadTypeReturned(self):
+ # return values of some method are type-checked
+ class BadTypeClass:
+ def __int__(self):
+ return None
+ __float__ = __int__
+ __str__ = __int__
+ __repr__ = __int__
+ __oct__ = __int__
+ __hex__ = __int__
+
+ for f in [int, float, str, repr, oct, hex]:
+ self.assertRaises(TypeError, f, BadTypeClass())
+
+ def testHashStuff(self):
+ # Test correct errors from hash() on objects with comparisons but
+ # no __hash__
+
+ class C0:
+ pass
+
+ hash(C0()) # This should work; the next two should raise TypeError
+
+ class C1:
+ def __cmp__(self, other): return 0
+
+ self.assertRaises(TypeError, hash, C1())
+
+ class C2:
+ def __eq__(self, other): return 1
+
+ self.assertRaises(TypeError, hash, C2())
+
+
+ def testSFBug532646(self):
+ # Test for SF bug 532646
+
+ class A:
+ pass
+ A.__call__ = A()
+ a = A()
+
+ try:
+ a() # This should not segfault
+ except RuntimeError:
+ pass
+ else:
+ self.fail("Failed to raise RuntimeError")
+
+ def testForExceptionsRaisedInInstanceGetattr2(self):
+ # Tests for exceptions raised in instance_getattr2().
+
+ def booh(self):
+ raise AttributeError("booh")
+
+ class A:
+ a = property(booh)
+ try:
+ A().a # Raised AttributeError: A instance has no attribute 'a'
+ except AttributeError as x:
+ if str(x) != "booh":
+ self.fail("attribute error for A().a got masked: %s" % x)
+
+ class E:
+ __eq__ = property(booh)
+ E() == E() # In debug mode, caused a C-level assert() to fail
+
+ class I:
+ __init__ = property(booh)
+ try:
+ # In debug mode, printed XXX undetected error and
+ # raises AttributeError
+ I()
+ except AttributeError as x:
+ pass
+ else:
+ self.fail("attribute error for I.__init__ got masked")
+
+ def testHashComparisonOfMethods(self):
+ # Test comparison and hash of methods
+ class A:
+ def __init__(self, x):
+ self.x = x
+ def f(self):
+ pass
+ def g(self):
+ pass
+ def __eq__(self, other):
+ return self.x == other.x
+ def __hash__(self):
+ return self.x
+ class B(A):
+ pass
+
+ a1 = A(1)
+ a2 = A(2)
+ self.assertEquals(a1.f, a1.f)
+ self.assertNotEquals(a1.f, a2.f)
+ self.assertNotEquals(a1.f, a1.g)
+ self.assertEquals(a1.f, A(1).f)
+ self.assertEquals(hash(a1.f), hash(a1.f))
+ self.assertEquals(hash(a1.f), hash(A(1).f))
+
+ self.assertNotEquals(A.f, a1.f)
+ self.assertNotEquals(A.f, A.g)
+ self.assertEquals(B.f, A.f)
+ self.assertEquals(hash(B.f), hash(A.f))
+
+ # the following triggers a SystemError in 2.4
+ a = A(hash(A.f.im_func)^(-1))
+ hash(a.f)
+
+def test_main():
+ test_support.run_unittest(ClassTests)
+
+if __name__=='__main__':
+ test_main()
diff --git a/Lib/test/test_cmd_line.py b/Lib/test/test_cmd_line.py
index cb1bc9ff..441df09 100644
--- a/Lib/test/test_cmd_line.py
+++ b/Lib/test/test_cmd_line.py
@@ -3,18 +3,25 @@ import test.test_support, unittest
import sys
import subprocess
+def _spawn_python(*args):
+ cmd_line = [sys.executable]
+ cmd_line.extend(args)
+ return subprocess.Popen(cmd_line, stdin=subprocess.PIPE,
+ stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
+
+def _kill_python(p):
+ p.stdin.close()
+ data = p.stdout.read()
+ p.stdout.close()
+ # try to cleanup the child so we don't appear to leak when running
+ # with regrtest -R. This should be a no-op on Windows.
+ subprocess._cleanup()
+ return data
+
class CmdLineTest(unittest.TestCase):
- def start_python(self, cmd_line):
- cmd = '"%s" %s' % (sys.executable, cmd_line)
- p = subprocess.Popen(cmd, shell=True, stdin=subprocess.PIPE,
- stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
- p.stdin.close()
- data = p.stdout.read()
- p.stdout.close()
- # try to cleanup the child so we don't appear to leak when running
- # with regrtest -R. This should be a no-op on Windows.
- subprocess._cleanup()
- return data
+ def start_python(self, *args):
+ p = _spawn_python(*args)
+ return _kill_python(p)
def exit_code(self, *args):
cmd_line = [sys.executable]
@@ -72,6 +79,17 @@ class CmdLineTest(unittest.TestCase):
self.exit_code('-m', 'timeit', '-n', '1'),
0)
+ def test_run_module_bug1764407(self):
+ # -m and -i need to play well together
+ # Runs the timeit module and checks the __main__
+ # namespace has been populated appropriately
+ p = _spawn_python('-i', '-m', 'timeit', '-n', '1')
+ p.stdin.write('Timer\n')
+ p.stdin.write('exit()\n')
+ data = _kill_python(p)
+ self.assertTrue(data.find(b'1 loop') != -1)
+ self.assertTrue(data.find(b'__main__.Timer') != -1)
+
def test_run_code(self):
# Test expected operation of the '-c' switch
# Switch needs an argument
diff --git a/Lib/test/test_frozen.py b/Lib/test/test_frozen.py
index 5387e56..e47bb64 100644
--- a/Lib/test/test_frozen.py
+++ b/Lib/test/test_frozen.py
@@ -1,27 +1,40 @@
# Test the frozen module defined in frozen.c.
+from __future__ import with_statement
-from test.test_support import TestFailed
+from test.test_support import captured_stdout, run_unittest
+import unittest
import sys, os
-try:
- import __hello__
-except ImportError as x:
- raise TestFailed, "import __hello__ failed:" + str(x)
-
-try:
- import __phello__
-except ImportError as x:
- raise TestFailed, "import __phello__ failed:" + str(x)
-
-try:
- import __phello__.spam
-except ImportError as x:
- raise TestFailed, "import __phello__.spam failed:" + str(x)
-
-if sys.platform != "mac": # On the Mac this import does succeed.
- try:
- import __phello__.foo
- except ImportError:
- pass
- else:
- raise TestFailed, "import __phello__.foo should have failed"
+class FrozenTests(unittest.TestCase):
+ def test_frozen(self):
+
+ with captured_stdout() as stdout:
+ try:
+ import __hello__
+ except ImportError as x:
+ self.fail("import __hello__ failed:" + str(x))
+
+ try:
+ import __phello__
+ except ImportError as x:
+ self.fail("import __phello__ failed:" + str(x))
+
+ try:
+ import __phello__.spam
+ except ImportError as x:
+ self.fail("import __phello__.spam failed:" + str(x))
+
+ if sys.platform != "mac": # On the Mac this import does succeed.
+ try:
+ import __phello__.foo
+ except ImportError:
+ pass
+ else:
+ self.fail("import __phello__.foo should have failed")
+
+ self.assertEquals(stdout.getvalue(),
+ 'Hello world...\nHello world...\nHello world...\n')
+
+
+def test_main():
+ run_unittest(FrozenTests)
diff --git a/Lib/test/test_mmap.py b/Lib/test/test_mmap.py
index b0f9e9e..f2232e4 100644
--- a/Lib/test/test_mmap.py
+++ b/Lib/test/test_mmap.py
@@ -305,6 +305,40 @@ class MmapTests(unittest.TestCase):
m[x] = b
self.assertEqual(m[x], b)
+ def test_extended_getslice(self):
+ # Test extended slicing by comparing with list slicing.
+ s = bytes(reversed(range(256)))
+ m = mmap.mmap(-1, len(s))
+ m[:] = s
+ self.assertEqual(m[:], s)
+ indices = (0, None, 1, 3, 19, 300, -1, -2, -31, -300)
+ for start in indices:
+ for stop in indices:
+ # Skip step 0 (invalid)
+ for step in indices[1:]:
+ self.assertEqual(m[start:stop:step],
+ s[start:stop:step])
+
+ def test_extended_set_del_slice(self):
+ # Test extended slicing by comparing with list slicing.
+ s = bytes(reversed(range(256)))
+ m = mmap.mmap(-1, len(s))
+ indices = (0, None, 1, 3, 19, 300, -1, -2, -31, -300)
+ for start in indices:
+ for stop in indices:
+ # Skip invalid step 0
+ for step in indices[1:]:
+ m[:] = s
+ self.assertEqual(m[:], s)
+ L = list(s)
+ # Make sure we have a slice of exactly the right length,
+ # but with different data.
+ data = L[start:stop:step]
+ data = bytes(reversed(data))
+ L[start:stop:step] = data
+ m[start:stop:step] = data
+ self.assertEquals(m[:], bytes(L))
+
def test_main():
run_unittest(MmapTests)
diff --git a/Lib/test/test_ossaudiodev.py b/Lib/test/test_ossaudiodev.py
index 8da22eb..97d5989 100644
--- a/Lib/test/test_ossaudiodev.py
+++ b/Lib/test/test_ossaudiodev.py
@@ -77,8 +77,7 @@ class OSSAudioDevTests(unittest.TestCase):
# set parameters based on .au file headers
dsp.setparameters(AFMT_S16_NE, nchannels, rate)
- print ("playing test sound file (expected running time: %.2f sec)"
- % expected_time)
+ self.assertEquals("%.2f" % expected_time, "2.93")
t1 = time.time()
dsp.write(data)
dsp.close()
@@ -121,7 +120,6 @@ class OSSAudioDevTests(unittest.TestCase):
"setparameters%r: returned %r" % (config, result))
def set_bad_parameters(self, dsp):
-
# Now try some configurations that are presumably bogus: eg. 300
# channels currently exceeds even Hollywood's ambitions, and
# negative sampling rate is utter nonsense. setparameters() should
@@ -166,7 +164,7 @@ class OSSAudioDevTests(unittest.TestCase):
def test_main():
try:
dsp = ossaudiodev.open('w')
- except IOError as msg:
+ except (ossaudiodev.error, IOError) as msg:
if msg.args[0] in (errno.EACCES, errno.ENOENT,
errno.ENODEV, errno.EBUSY):
raise TestSkipped(msg)
diff --git a/Lib/test/test_pkg.py b/Lib/test/test_pkg.py
index 45d0f4f..1e32d2c 100644
--- a/Lib/test/test_pkg.py
+++ b/Lib/test/test_pkg.py
@@ -155,7 +155,8 @@ class Test(unittest.TestCase):
("t4 sub.py", "raise RuntimeError('Shouldnt load sub.py')"),
("t4 sub", None),
("t4 sub __init__.py", ""),
- ("t4 sub subsub.py", "raise RuntimeError('Shouldnt load subsub.py')"),
+ ("t4 sub subsub.py",
+ "raise RuntimeError('Shouldnt load subsub.py')"),
("t4 sub subsub", None),
("t4 sub subsub __init__.py", "spam = 1"),
]
@@ -196,7 +197,8 @@ class Test(unittest.TestCase):
def test_6(self):
hier = [
("t6", None),
- ("t6 __init__.py", "__all__ = ['spam', 'ham', 'eggs']"),
+ ("t6 __init__.py",
+ "__all__ = ['spam', 'ham', 'eggs']"),
("t6 spam.py", ""),
("t6 ham.py", ""),
("t6 eggs.py", ""),
@@ -223,10 +225,11 @@ class Test(unittest.TestCase):
("t7.py", ""),
("t7", None),
("t7 __init__.py", ""),
- ("t7 sub.py", "raise RuntimeError('Shouldnt load sub.py')"),
+ ("t7 sub.py",
+ "raise RuntimeError('Shouldnt load sub.py')"),
("t7 sub", None),
("t7 sub __init__.py", ""),
- ("t7 sub subsub.py",
+ ("t7 sub .py",
"raise RuntimeError('Shouldnt load subsub.py')"),
("t7 sub subsub", None),
("t7 sub subsub __init__.py",
diff --git a/Lib/test/test_runpy.py b/Lib/test/test_runpy.py
index b628d77..b003f17 100644
--- a/Lib/test/test_runpy.py
+++ b/Lib/test/test_runpy.py
@@ -5,7 +5,7 @@ import os.path
import sys
import tempfile
from test.test_support import verbose, run_unittest, forget
-from runpy import _run_module_code, run_module
+from runpy import _run_code, _run_module_code, _run_module_as_main, run_module
# Set up the test code and expected results
@@ -29,6 +29,16 @@ class RunModuleCodeTest(unittest.TestCase):
"nested = runpy._run_module_code('x=1\\n', mod_name='<run>')\n"
)
+ def test_run_code(self):
+ saved_argv0 = sys.argv[0]
+ d = _run_code(self.test_source, {})
+ self.failUnless(d["result"] == self.expected_result)
+ self.failUnless(d["__name__"] is None)
+ self.failUnless(d["__file__"] is None)
+ self.failUnless(d["__loader__"] is None)
+ self.failUnless(d["run_argv0"] is saved_argv0)
+ self.failUnless("run_name" not in d)
+ self.failUnless(sys.argv[0] is saved_argv0)
def test_run_module_code(self):
initial = object()
@@ -37,44 +47,24 @@ class RunModuleCodeTest(unittest.TestCase):
loader = "Now you're just being silly"
d1 = dict(initial=initial)
saved_argv0 = sys.argv[0]
- try:
- d2 = _run_module_code(self.test_source,
- d1,
- name,
- file,
- loader,
- alter_sys=True)
- self.failUnless("result" not in d1)
- self.failUnless(d2["initial"] is initial)
- self.assertEqual(d2["result"], self.expected_result)
- self.assertEqual(d2["nested"]["x"], 1)
- self.assertEqual(d2["nested"]["__name__"], "<run>")
- self.failUnless(d2["__name__"] is name)
- self.failUnless(d2["__file__"] is file)
- self.failUnless(d2["__loader__"] is loader)
- self.failUnless(d2["run_argv0"] is file)
- self.failUnless(d2["run_name_in_sys_modules"])
- self.failUnless(d2["module_in_sys_modules"])
- self.failUnless(sys.argv[0] is not saved_argv0)
- self.failUnless(name in sys.modules)
- finally:
- sys.argv[0] = saved_argv0
- if name in sys.modules:
- del sys.modules[name]
-
- def test_run_module_code_defaults(self):
- saved_argv0 = sys.argv[0]
- d = _run_module_code(self.test_source)
- self.assertEqual(d["result"], self.expected_result)
- self.failUnless(d["nested"]["x"] == 1)
- self.failUnless(d["nested"]["__name__"] == "<run>")
- self.failUnless(d["__name__"] is None)
- self.failUnless(d["__file__"] is None)
- self.failUnless(d["__loader__"] is None)
- self.failUnless(d["run_argv0"] is saved_argv0)
- self.failUnless(not d["run_name_in_sys_modules"])
+ d2 = _run_module_code(self.test_source,
+ d1,
+ name,
+ file,
+ loader)
+ self.failUnless("result" not in d1)
+ self.failUnless(d2["initial"] is initial)
+ self.assertEqual(d2["result"], self.expected_result)
+ self.assertEqual(d2["nested"]["x"], 1)
+ self.failUnless(d2["__name__"] is name)
+ self.failUnless(d2["run_name_in_sys_modules"])
+ self.failUnless(d2["module_in_sys_modules"])
+ self.failUnless(d2["__file__"] is file)
+ self.failUnless(d2["run_argv0"] is file)
+ self.failUnless(d2["__loader__"] is loader)
self.failUnless(sys.argv[0] is saved_argv0)
- self.failUnless(None not in sys.modules)
+ self.failUnless(name not in sys.modules)
+
class RunModuleTest(unittest.TestCase):
diff --git a/Lib/test/test_signal.py b/Lib/test/test_signal.py
index 15c6e9c..c1df51e 100644
--- a/Lib/test/test_signal.py
+++ b/Lib/test/test_signal.py
@@ -1,167 +1,178 @@
-# Test the signal module
-from test.test_support import verbose, TestSkipped, TestFailed, vereq
+import unittest
+from test import test_support
import signal
import os, sys, time
-if sys.platform[:3] in ('win', 'os2'):
- raise TestSkipped, "Can't test signal on %s" % sys.platform
-
-MAX_DURATION = 20 # Entire test should last at most 20 sec.
-
-if verbose:
- x = '-x'
-else:
- x = '+x'
-
-pid = os.getpid()
-if verbose:
- print("test runner's pid is", pid)
-
-# Shell script that will send us asynchronous signals
-script = """
- (
- set %(x)s
- sleep 2
- kill -HUP %(pid)d
- sleep 2
- kill -USR1 %(pid)d
- sleep 2
- kill -USR2 %(pid)d
- ) &
-""" % vars()
-
-a_called = b_called = False
-
-def handlerA(*args):
- global a_called
- a_called = True
- if verbose:
- print("handlerA invoked", args)
-
class HandlerBCalled(Exception):
pass
-def handlerB(*args):
- global b_called
- b_called = True
- if verbose:
- print("handlerB invoked", args)
- raise HandlerBCalled, args
-
-# Set up a child to send signals to us (the parent) after waiting long
-# enough to receive the alarm. It seems we miss the alarm for some
-# reason. This will hopefully stop the hangs on Tru64/Alpha.
-# Alas, it doesn't. Tru64 appears to miss all the signals at times, or
-# seemingly random subsets of them, and nothing done in force_test_exit
-# so far has actually helped.
-def force_test_exit():
- # Sigh, both imports seem necessary to avoid errors.
- import os
- fork_pid = os.fork()
- if fork_pid:
- # In parent.
- return fork_pid
-
- # In child.
- import os, time
- try:
- # Wait 5 seconds longer than the expected alarm to give enough
- # time for the normal sequence of events to occur. This is
- # just a stop-gap to try to prevent the test from hanging.
- time.sleep(MAX_DURATION + 5)
- print(' child should not have to kill parent', file=sys.__stdout__)
- for signame in "SIGHUP", "SIGUSR1", "SIGUSR2", "SIGALRM":
- os.kill(pid, getattr(signal, signame))
- print(" child sent", signame, "to", pid, file=sys.__stdout__)
- time.sleep(1)
- finally:
- os._exit(0)
-
-# Install handlers.
-hup = signal.signal(signal.SIGHUP, handlerA)
-usr1 = signal.signal(signal.SIGUSR1, handlerB)
-usr2 = signal.signal(signal.SIGUSR2, signal.SIG_IGN)
-alrm = signal.signal(signal.SIGALRM, signal.default_int_handler)
-
-try:
-
- signal.alarm(MAX_DURATION)
- vereq(signal.getsignal(signal.SIGHUP), handlerA)
- vereq(signal.getsignal(signal.SIGUSR1), handlerB)
- vereq(signal.getsignal(signal.SIGUSR2), signal.SIG_IGN)
- vereq(signal.getsignal(signal.SIGALRM), signal.default_int_handler)
-
- # Try to ensure this test exits even if there is some problem with alarm.
- # Tru64/Alpha often hangs and is ultimately killed by the buildbot.
- fork_pid = force_test_exit()
-
- try:
- signal.getsignal(4242)
- raise TestFailed('expected ValueError for invalid signal # to '
- 'getsignal()')
- except ValueError:
- pass
-
- try:
- signal.signal(4242, handlerB)
- raise TestFailed('expected ValueError for invalid signal # to '
- 'signal()')
- except ValueError:
- pass
-
- try:
- signal.signal(signal.SIGUSR1, None)
- raise TestFailed('expected TypeError for non-callable')
- except TypeError:
- pass
-
- # Launch an external script to send us signals.
- # We expect the external script to:
- # send HUP, which invokes handlerA to set a_called
- # send USR1, which invokes handlerB to set b_called and raise
- # HandlerBCalled
- # send USR2, which is ignored
- #
- # Then we expect the alarm to go off, and its handler raises
- # KeyboardInterrupt, finally getting us out of the loop.
- os.system(script)
- try:
- print("starting pause() loop...")
- while 1:
- try:
- if verbose:
- print("call pause()...")
- signal.pause()
- if verbose:
- print("pause() returned")
- except HandlerBCalled:
- if verbose:
- print("HandlerBCalled exception caught")
-
- except KeyboardInterrupt:
- if verbose:
- print("KeyboardInterrupt (the alarm() went off)")
-
- if not a_called:
- print('HandlerA not called')
-
- if not b_called:
- print('HandlerB not called')
-
-finally:
- # Forcibly kill the child we created to ping us if there was a test error.
- try:
- # Make sure we don't kill ourself if there was a fork error.
- if fork_pid > 0:
- os.kill(fork_pid, signal.SIGKILL)
- except:
- # If the child killed us, it has probably exited. Killing a
- # non-existent process will raise an error which we don't care about.
- pass
-
- # Restore handlers.
- signal.alarm(0) # cancel alarm in case we died early
- signal.signal(signal.SIGHUP, hup)
- signal.signal(signal.SIGUSR1, usr1)
- signal.signal(signal.SIGUSR2, usr2)
- signal.signal(signal.SIGALRM, alrm)
+class InterProcessSignalTests(unittest.TestCase):
+ MAX_DURATION = 20 # Entire test should last at most 20 sec.
+
+ # Set up a child to send signals to us (the parent) after waiting
+ # long enough to receive the alarm. It seems we miss the alarm
+ # for some reason. This will hopefully stop the hangs on
+ # Tru64/Alpha. Alas, it doesn't. Tru64 appears to miss all the
+ # signals at times, or seemingly random subsets of them, and
+ # nothing done in force_test_exit so far has actually helped.
+ def spawn_force_test_exit_process(self, parent_pid):
+ # Sigh, both imports seem necessary to avoid errors.
+ import os
+ fork_pid = os.fork()
+ if fork_pid:
+ # In parent.
+ return fork_pid
+
+ # In child.
+ import os, time
+ try:
+ # Wait 5 seconds longer than the expected alarm to give enough
+ # time for the normal sequence of events to occur. This is
+ # just a stop-gap to try to prevent the test from hanging.
+ time.sleep(self.MAX_DURATION + 5)
+ print(" child should not have to kill parent",
+ file=sys.__stdout__)
+ for signame in "SIGHUP", "SIGUSR1", "SIGUSR2", "SIGALRM":
+ os.kill(parent_pid, getattr(signal, signame))
+ print(" child sent", signame, "to",
+ parent_pid, file=sys.__stdout__)
+ time.sleep(1)
+ finally:
+ os._exit(0)
+
+ def handlerA(self, *args):
+ self.a_called = True
+ if test_support.verbose:
+ print("handlerA invoked", args)
+
+ def handlerB(self, *args):
+ self.b_called = True
+ if test_support.verbose:
+ print("handlerB invoked", args)
+ raise HandlerBCalled(*args)
+
+ def test_main(self):
+ self.assertEquals(signal.getsignal(signal.SIGHUP), self.handlerA)
+ self.assertEquals(signal.getsignal(signal.SIGUSR1), self.handlerB)
+ self.assertEquals(signal.getsignal(signal.SIGUSR2), signal.SIG_IGN)
+ self.assertEquals(signal.getsignal(signal.SIGALRM),
+ signal.default_int_handler)
+
+ # Launch an external script to send us signals.
+ # We expect the external script to:
+ # send HUP, which invokes handlerA to set a_called
+ # send USR1, which invokes handlerB to set b_called and raise
+ # HandlerBCalled
+ # send USR2, which is ignored
+ #
+ # Then we expect the alarm to go off, and its handler raises
+ # KeyboardInterrupt, finally getting us out of the loop.
+
+ if test_support.verbose:
+ verboseflag = '-x'
+ else:
+ verboseflag = '+x'
+
+ pid = self.pid
+ if test_support.verbose:
+ print("test runner's pid is", pid)
+
+ # Shell script that will send us asynchronous signals
+ script = """
+ (
+ set %(verboseflag)s
+ sleep 2
+ kill -HUP %(pid)d
+ sleep 2
+ kill -USR1 %(pid)d
+ sleep 2
+ kill -USR2 %(pid)d
+ ) &
+ """ % vars()
+
+ signal.alarm(self.MAX_DURATION)
+
+ handler_b_exception_raised = False
+
+ os.system(script)
+ try:
+ if test_support.verbose:
+ print("starting pause() loop...")
+ while 1:
+ try:
+ if test_support.verbose:
+ print("call pause()...")
+ signal.pause()
+ if test_support.verbose:
+ print("pause() returned")
+ except HandlerBCalled:
+ handler_b_exception_raised = True
+ if test_support.verbose:
+ print("HandlerBCalled exception caught")
+
+ except KeyboardInterrupt:
+ if test_support.verbose:
+ print("KeyboardInterrupt (the alarm() went off)")
+
+ self.assert_(self.a_called)
+ self.assert_(self.b_called)
+ self.assert_(handler_b_exception_raised)
+
+ def setUp(self):
+ # Install handlers.
+ self.hup = signal.signal(signal.SIGHUP, self.handlerA)
+ self.usr1 = signal.signal(signal.SIGUSR1, self.handlerB)
+ self.usr2 = signal.signal(signal.SIGUSR2, signal.SIG_IGN)
+ self.alrm = signal.signal(signal.SIGALRM,
+ signal.default_int_handler)
+ self.a_called = False
+ self.b_called = False
+ self.pid = os.getpid()
+ self.fork_pid = self.spawn_force_test_exit_process(self.pid)
+
+ def tearDown(self):
+ # Forcibly kill the child we created to ping us if there was a
+ # test error.
+ try:
+ # Make sure we don't kill ourself if there was a fork
+ # error.
+ if self.fork_pid > 0:
+ os.kill(self.fork_pid, signal.SIGKILL)
+ except:
+ # If the child killed us, it has probably exited. Killing
+ # a non-existent process will raise an error which we
+ # don't care about.
+ pass
+
+ # Restore handlers.
+ signal.alarm(0) # cancel alarm in case we died early
+ signal.signal(signal.SIGHUP, self.hup)
+ signal.signal(signal.SIGUSR1, self.usr1)
+ signal.signal(signal.SIGUSR2, self.usr2)
+ signal.signal(signal.SIGALRM, self.alrm)
+
+
+class BasicSignalTests(unittest.TestCase):
+ def test_out_of_range_signal_number_raises_error(self):
+ self.assertRaises(ValueError, signal.getsignal, 4242)
+
+ def trivial_signal_handler(*args):
+ pass
+
+ self.assertRaises(ValueError, signal.signal, 4242,
+ trivial_signal_handler)
+
+ def test_setting_signal_handler_to_none_raises_error(self):
+ self.assertRaises(TypeError, signal.signal,
+ signal.SIGUSR1, None)
+
+def test_main():
+ if sys.platform[:3] in ('win', 'os2'):
+ raise test_support.TestSkipped("Can't test signal on %s" % \
+ sys.platform)
+
+ test_support.run_unittest(BasicSignalTests, InterProcessSignalTests)
+
+
+if __name__ == "__main__":
+ test_main()
diff --git a/Lib/test/test_ssl.py b/Lib/test/test_ssl.py
new file mode 100644
index 0000000..35c6af9
--- /dev/null
+++ b/Lib/test/test_ssl.py
@@ -0,0 +1,372 @@
+# Test the support for SSL and sockets
+
+import sys
+import unittest
+from test import test_support
+import socket
+import errno
+import threading
+import subprocess
+import time
+import os
+import pprint
+import urllib
+import shutil
+import traceback
+
+# Optionally test SSL support, if we have it in the tested platform
+skip_expected = False
+try:
+ import ssl
+except ImportError:
+ skip_expected = True
+
+CERTFILE = None
+
+
+def handle_error(prefix):
+ exc_format = ' '.join(traceback.format_exception(*sys.exc_info()))
+ sys.stdout.write(prefix + exc_format)
+
+
+class BasicTests(unittest.TestCase):
+
+ def testRudeShutdown(self):
+ # Some random port to connect to.
+ PORT = [9934]
+
+ listener_ready = threading.Event()
+ listener_gone = threading.Event()
+
+ # `listener` runs in a thread. It opens a socket listening on
+ # PORT, and sits in an accept() until the main thread connects.
+ # Then it rudely closes the socket, and sets Event `listener_gone`
+ # to let the main thread know the socket is gone.
+ def listener():
+ s = socket.socket()
+ PORT[0] = test_support.bind_port(s, '', PORT[0])
+ s.listen(5)
+ listener_ready.set()
+ s.accept()
+ s = None # reclaim the socket object, which also closes it
+ listener_gone.set()
+
+ def connector():
+ listener_ready.wait()
+ s = socket.socket()
+ s.connect(('localhost', PORT[0]))
+ listener_gone.wait()
+ try:
+ ssl_sock = socket.ssl(s)
+ except socket.sslerror:
+ pass
+ else:
+ raise test_support.TestFailed(
+ 'connecting to closed SSL socket should have failed')
+
+ t = threading.Thread(target=listener)
+ t.start()
+ connector()
+ t.join()
+
+ def testSSLconnect(self):
+ import os
+ with test_support.transient_internet():
+ s = ssl.sslsocket(socket.socket(socket.AF_INET),
+ cert_reqs=ssl.CERT_NONE)
+ s.connect(("pop.gmail.com", 995))
+ c = s.getpeercert()
+ if c:
+ raise test_support.TestFailed("Peer cert %s shouldn't be here!")
+ s.close()
+
+ # this should fail because we have no verification certs
+ s = ssl.sslsocket(socket.socket(socket.AF_INET),
+ cert_reqs=ssl.CERT_REQUIRED)
+ try:
+ s.connect(("pop.gmail.com", 995))
+ except ssl.sslerror:
+ pass
+ finally:
+ s.close()
+
+class ConnectedTests(unittest.TestCase):
+
+ def testTLSecho (self):
+
+ s1 = socket.socket()
+ try:
+ s1.connect(('127.0.0.1', 10024))
+ except:
+ handle_error("connection failure:\n")
+ raise test_support.TestFailed("Can't connect to test server")
+ else:
+ try:
+ c1 = ssl.sslsocket(s1, ssl_version=ssl.PROTOCOL_TLSv1)
+ except:
+ handle_error("SSL handshake failure:\n")
+ raise test_support.TestFailed("Can't SSL-handshake with test server")
+ else:
+ if not c1:
+ raise test_support.TestFailed("Can't SSL-handshake with test server")
+ indata = "FOO\n"
+ c1.write(indata)
+ outdata = c1.read()
+ if outdata != indata.lower():
+ raise test_support.TestFailed("bad data <<%s>> received; expected <<%s>>\n" % (data, indata.lower()))
+ c1.close()
+
+ def testReadCert(self):
+
+ s2 = socket.socket()
+ try:
+ s2.connect(('127.0.0.1', 10024))
+ except:
+ handle_error("connection failure:\n")
+ raise test_support.TestFailed("Can't connect to test server")
+ else:
+ try:
+ c2 = ssl.sslsocket(s2, ssl_version=ssl.PROTOCOL_TLSv1,
+ cert_reqs=ssl.CERT_REQUIRED, ca_certs=CERTFILE)
+ except:
+ handle_error("SSL handshake failure:\n")
+ raise test_support.TestFailed("Can't SSL-handshake with test server")
+ else:
+ if not c2:
+ raise test_support.TestFailed("Can't SSL-handshake with test server")
+ cert = c2.getpeercert()
+ if not cert:
+ raise test_support.TestFailed("Can't get peer certificate.")
+ if not cert.has_key('subject'):
+ raise test_support.TestFailed(
+ "No subject field in certificate: %s." %
+ pprint.pformat(cert))
+ if not (cert['subject'].has_key('organizationName')):
+ raise test_support.TestFailed(
+ "No 'organizationName' field in certificate subject: %s." %
+ pprint.pformat(cert))
+ if (cert['subject']['organizationName'] !=
+ "Python Software Foundation"):
+ raise test_support.TestFailed(
+ "Invalid 'organizationName' field in certificate subject; "
+ "should be 'Python Software Foundation'.");
+ c2.close()
+
+
+class ThreadedEchoServer(threading.Thread):
+
+ class ConnectionHandler(threading.Thread):
+
+ def __init__(self, server, connsock):
+ self.server = server
+ self.running = False
+ self.sock = connsock
+ threading.Thread.__init__(self)
+ self.setDaemon(True)
+
+ def run (self):
+ self.running = True
+ try:
+ sslconn = ssl.sslsocket(self.sock, server_side=True,
+ certfile=self.server.certificate,
+ ssl_version=self.server.protocol,
+ cert_reqs=self.server.certreqs)
+ except:
+ # here, we want to stop the server, because this shouldn't
+ # happen in the context of our test case
+ handle_error("Test server failure:\n")
+ self.running = False
+ # normally, we'd just stop here, but for the test
+ # harness, we want to stop the server
+ self.server.stop()
+ return
+
+ while self.running:
+ try:
+ msg = sslconn.read()
+ if not msg:
+ # eof, so quit this handler
+ self.running = False
+ sslconn.close()
+ elif msg.strip() == 'over':
+ sslconn.close()
+ self.server.stop()
+ self.running = False
+ else:
+ if test_support.verbose:
+ sys.stdout.write("\nserver: %s\n" % msg.strip().lower())
+ sslconn.write(msg.lower())
+ except ssl.sslerror:
+ handle_error("Test server failure:\n")
+ sslconn.close()
+ self.running = False
+ # normally, we'd just stop here, but for the test
+ # harness, we want to stop the server
+ self.server.stop()
+ except:
+ handle_error('')
+
+ def __init__(self, port, certificate, ssl_version=None,
+ certreqs=None, cacerts=None):
+ if ssl_version is None:
+ ssl_version = ssl.PROTOCOL_TLSv1
+ if certreqs is None:
+ certreqs = ssl.CERT_NONE
+ self.certificate = certificate
+ self.protocol = ssl_version
+ self.certreqs = certreqs
+ self.cacerts = cacerts
+ self.sock = socket.socket()
+ self.flag = None
+ if hasattr(socket, 'SO_REUSEADDR'):
+ self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+ if hasattr(socket, 'SO_REUSEPORT'):
+ self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
+ self.sock.bind(('127.0.0.1', port))
+ self.active = False
+ threading.Thread.__init__(self)
+ self.setDaemon(False)
+
+ def start (self, flag=None):
+ self.flag = flag
+ threading.Thread.start(self)
+
+ def run (self):
+ self.sock.settimeout(0.5)
+ self.sock.listen(5)
+ self.active = True
+ if self.flag:
+ # signal an event
+ self.flag.set()
+ while self.active:
+ try:
+ newconn, connaddr = self.sock.accept()
+ if test_support.verbose:
+ sys.stdout.write('\nserver: new connection from ' + str(connaddr) + '\n')
+ handler = self.ConnectionHandler(self, newconn)
+ handler.start()
+ except socket.timeout:
+ pass
+ except KeyboardInterrupt:
+ self.stop()
+ except:
+ handle_error("Test server failure:\n")
+
+ def stop (self):
+ self.active = False
+ self.sock.close()
+
+CERTFILE_CONFIG_TEMPLATE = """
+# create RSA certs - Server
+
+[ req ]
+default_bits = 1024
+encrypt_key = yes
+distinguished_name = req_dn
+x509_extensions = cert_type
+
+[ req_dn ]
+countryName = Country Name (2 letter code)
+countryName_default = US
+countryName_min = 2
+countryName_max = 2
+
+stateOrProvinceName = State or Province Name (full name)
+stateOrProvinceName_default = %(state)s
+
+localityName = Locality Name (eg, city)
+localityName_default = %(city)s
+
+0.organizationName = Organization Name (eg, company)
+0.organizationName_default = %(organization)s
+
+organizationalUnitName = Organizational Unit Name (eg, section)
+organizationalUnitName_default = %(unit)s
+
+0.commonName = Common Name (FQDN of your server)
+0.commonName_default = %(common-name)s
+
+# To create a certificate for more than one name uncomment:
+# 1.commonName = DNS alias of your server
+# 2.commonName = DNS alias of your server
+# ...
+# See http://home.netscape.com/eng/security/ssl_2.0_certificate.html
+# to see how Netscape understands commonName.
+
+[ cert_type ]
+nsCertType = server
+"""
+
+def create_cert_files(hostname=None):
+
+ """This is the routine that was run to create the certificate
+ and private key contained in keycert.pem."""
+
+ import tempfile, socket, os
+ d = tempfile.mkdtemp()
+ # now create a configuration file for the CA signing cert
+ fqdn = hostname or socket.getfqdn()
+ crtfile = os.path.join(d, "cert.pem")
+ conffile = os.path.join(d, "ca.conf")
+ fp = open(conffile, "w")
+ fp.write(CERTFILE_CONFIG_TEMPLATE %
+ {'state': "Delaware",
+ 'city': "Wilmington",
+ 'organization': "Python Software Foundation",
+ 'unit': "SSL",
+ 'common-name': fqdn,
+ })
+ fp.close()
+ error = os.system(
+ "openssl req -batch -new -x509 -days 2000 -nodes -config %s "
+ "-keyout \"%s\" -out \"%s\" > /dev/null < /dev/null 2>&1" %
+ (conffile, crtfile, crtfile))
+ # now we have a self-signed server cert in crtfile
+ os.unlink(conffile)
+ if (os.WEXITSTATUS(error) or
+ not os.path.exists(crtfile) or os.path.getsize(crtfile) == 0):
+ if test_support.verbose:
+ sys.stdout.write("Unable to create certificate for test, "
+ + "error status %d\n" % (error >> 8))
+ crtfile = None
+ elif test_support.verbose:
+ sys.stdout.write(open(crtfile, 'r').read() + '\n')
+ return d, crtfile
+
+
+def test_main(verbose=False):
+ if skip_expected:
+ raise test_support.TestSkipped("socket module has no ssl support")
+
+ global CERTFILE
+ CERTFILE = os.path.join(os.path.dirname(__file__) or os.curdir,
+ "keycert.pem")
+ if not CERTFILE:
+ sys.__stdout__.write("Skipping test_ssl ConnectedTests; "
+ "couldn't create a certificate.\n")
+
+ tests = [BasicTests]
+
+ server = None
+ if CERTFILE and test_support.is_resource_enabled('network'):
+ server = ThreadedEchoServer(10024, CERTFILE)
+ flag = threading.Event()
+ server.start(flag)
+ # wait for it to start
+ flag.wait()
+ tests.append(ConnectedTests)
+
+ thread_info = test_support.threading_setup()
+
+ try:
+ test_support.run_unittest(*tests)
+ finally:
+ if server is not None and server.active:
+ server.stop()
+ # wait for it to stop
+ server.join()
+
+ test_support.threading_cleanup(*thread_info)
+
+if __name__ == "__main__":
+ test_main()
diff --git a/Lib/test/test_structmembers.py b/Lib/test/test_structmembers.py
index c3d01c8..93d7bfb 100644
--- a/Lib/test/test_structmembers.py
+++ b/Lib/test/test_structmembers.py
@@ -5,7 +5,7 @@ from _testcapi import test_structmembersType, \
LONG_MAX, LONG_MIN, ULONG_MAX, \
LLONG_MAX, LLONG_MIN, ULLONG_MAX
-import warnings, unittest
+import warnings, unittest, sys
from test import test_support
ts=test_structmembersType(1,2,3,4,5,6,7,8,9.99999,10.1010101010)
@@ -59,7 +59,7 @@ class ReadWriteTests(unittest.TestCase):
class TestWarnings(unittest.TestCase):
def has_warned(self, w):
- self.assert_(w.category is RuntimeWarning)
+ self.assertEqual(w.category, RuntimeWarning)
def test_byte_max(self):
with test_support.catch_warning() as w:
@@ -94,10 +94,13 @@ class TestWarnings(unittest.TestCase):
def test_main(verbose=None):
- test_support.run_unittest(
- ReadWriteTests,
- TestWarnings
- )
+ # Obscure hack so that this test passes after reloads or repeated calls
+ # to test_main (regrtest -R).
+ if '__warningregistry__' in globals():
+ del globals()['__warningregistry__']
+ if hasattr(sys, '__warningregistry__'):
+ del sys.__warningregistry__
+ test_support.run_unittest(__name__)
if __name__ == "__main__":
test_main(verbose=True)
diff --git a/Lib/test/test_structseq.py b/Lib/test/test_structseq.py
index 12ebef9..7a18fb2 100644
--- a/Lib/test/test_structseq.py
+++ b/Lib/test/test_structseq.py
@@ -97,6 +97,18 @@ class StructSeqTest(unittest.TestCase):
t = time.gmtime()
x = t.__reduce__()
+ def test_extended_getslice(self):
+ # Test extended slicing by comparing with list slicing.
+ t = time.gmtime()
+ L = list(t)
+ indices = (0, None, 1, 3, 19, 300, -1, -2, -31, -300)
+ for start in indices:
+ for stop in indices:
+ # Skip step 0 (invalid)
+ for step in indices[1:]:
+ self.assertEqual(list(t[start:stop:step]),
+ L[start:stop:step])
+
def test_main():
test_support.run_unittest(StructSeqTest)
diff --git a/Lib/test/test_support.py b/Lib/test/test_support.py
index 2d9612e..99f57e6 100644
--- a/Lib/test/test_support.py
+++ b/Lib/test/test_support.py
@@ -362,6 +362,22 @@ def transient_internet():
return contextlib.nested(time_out, socket_peer_reset, ioerror_peer_reset)
+@contextlib.contextmanager
+def captured_stdout():
+ """Run the with statement body using a StringIO object as sys.stdout.
+ Example use::
+
+ with captured_stdout() as s:
+ print "hello"
+ assert s.getvalue() == "hello"
+ """
+ import io
+ orig_stdout = sys.stdout
+ sys.stdout = io.StringIO()
+ yield sys.stdout
+ sys.stdout = orig_stdout
+
+
#=======================================================================
# Decorator for running a function in a different locale, correctly resetting
# it afterwards.
diff --git a/Lib/test/test_tarfile.py b/Lib/test/test_tarfile.py
index 8a59879..10498bf 100644
--- a/Lib/test/test_tarfile.py
+++ b/Lib/test/test_tarfile.py
@@ -140,11 +140,25 @@ class UstarReadTest(ReadTest):
class MiscReadTest(ReadTest):
- def test_no_filename(self):
+ def test_no_name_argument(self):
fobj = open(self.tarname, "rb")
tar = tarfile.open(fileobj=fobj, mode=self.mode)
self.assertEqual(tar.name, os.path.abspath(fobj.name))
+ def test_no_name_attribute(self):
+ data = open(self.tarname, "rb").read()
+ fobj = io.BytesIO(data)
+ self.assertRaises(AttributeError, getattr, fobj, "name")
+ tar = tarfile.open(fileobj=fobj, mode=self.mode)
+ self.assertEqual(tar.name, None)
+
+ def test_empty_name_attribute(self):
+ data = open(self.tarname, "rb").read()
+ fobj = io.BytesIO(data)
+ fobj.name = ""
+ tar = tarfile.open(fileobj=fobj, mode=self.mode)
+ self.assertEqual(tar.name, None)
+
def test_fail_comp(self):
# For Gzip and Bz2 Tests: fail with a ReadError on an uncompressed file.
if self.mode == "r:":
diff --git a/Lib/test/test_userstring.py b/Lib/test/test_userstring.py
index ec0f1a9..8bd8d10 100755
--- a/Lib/test/test_userstring.py
+++ b/Lib/test/test_userstring.py
@@ -3,6 +3,7 @@
# UserString instances should behave similar to builtin string objects.
import unittest
+import string
from test import test_support, string_tests
from UserString import UserString, MutableString
@@ -86,6 +87,28 @@ class MutableStringTest(UserStringTest):
del s[-1:10]
self.assertEqual(s, "fo")
+ def test_extended_set_del_slice(self):
+ indices = (0, None, 1, 3, 19, 100, -1, -2, -31, -100)
+ orig = string.ascii_letters + string.digits
+ for start in indices:
+ for stop in indices:
+ # Use indices[1:] when MutableString can handle real
+ # extended slices
+ for step in (None, 1, -1):
+ s = self.type2test(orig)
+ L = list(orig)
+ # Make sure we have a slice of exactly the right length,
+ # but with (hopefully) different data.
+ data = L[start:stop:step]
+ data.reverse()
+ L[start:stop:step] = data
+ s[start:stop:step] = "".join(data)
+ self.assertEquals(s, "".join(L))
+
+ del L[start:stop:step]
+ del s[start:stop:step]
+ self.assertEquals(s, "".join(L))
+
def test_immutable(self):
s = self.type2test("foobar")
s2 = s.immutable()
diff --git a/Lib/test/test_winreg.py b/Lib/test/test_winreg.py
index 9984af1..fc898b8 100644
--- a/Lib/test/test_winreg.py
+++ b/Lib/test/test_winreg.py
@@ -3,8 +3,9 @@
from _winreg import *
import os, sys
+import unittest
-from test.test_support import verify
+from test import test_support
test_key_name = "SOFTWARE\\Python Registry Test Key - Delete Me"
@@ -13,137 +14,152 @@ test_data = [
("String Val", "A string value", REG_SZ),
("StringExpand", "The path is %path%", REG_EXPAND_SZ),
("Multi-string", ["Lots", "of", "string", "values"], REG_MULTI_SZ),
- ("Raw Data", bytes("binary"+chr(0)+"data"), REG_BINARY),
+ ("Raw Data", b"binary\x00data", REG_BINARY),
("Big String", "x"*(2**14-1), REG_SZ),
("Big Binary", b"x"*(2**14), REG_BINARY),
]
-def WriteTestData(root_key):
- # Set the default value for this key.
- SetValue(root_key, test_key_name, REG_SZ, "Default value")
- key = CreateKey(root_key, test_key_name)
- # Create a sub-key
- sub_key = CreateKey(key, "sub_key")
- # Give the sub-key some named values
-
- for value_name, value_data, value_type in test_data:
- SetValueEx(sub_key, value_name, 0, value_type, value_data)
-
- # Check we wrote as many items as we thought.
- nkeys, nvalues, since_mod = QueryInfoKey(key)
- verify(nkeys==1, "Not the correct number of sub keys")
- verify(nvalues==1, "Not the correct number of values")
- nkeys, nvalues, since_mod = QueryInfoKey(sub_key)
- verify(nkeys==0, "Not the correct number of sub keys")
- verify(nvalues==len(test_data), "Not the correct number of values")
- # Close this key this way...
- # (but before we do, copy the key as an integer - this allows
- # us to test that the key really gets closed).
- int_sub_key = int(sub_key)
- CloseKey(sub_key)
- try:
- QueryInfoKey(int_sub_key)
- raise RuntimeError, "It appears the CloseKey() function does not close the actual key!"
- except EnvironmentError:
- pass
- # ... and close that key that way :-)
- int_key = int(key)
- key.Close()
- try:
- QueryInfoKey(int_key)
- raise RuntimeError, "It appears the key.Close() function does not close the actual key!"
- except EnvironmentError:
- pass
-
-def ReadTestData(root_key):
- # Check we can get default value for this key.
- val = QueryValue(root_key, test_key_name)
- verify(type(val) is str and val=="Default value", "Registry didn't give back the correct value")
-
- key = OpenKey(root_key, test_key_name)
- # Read the sub-keys
- sub_key = OpenKey(key, "sub_key")
- # Check I can enumerate over the values.
- index = 0
- while 1:
+class WinregTests(unittest.TestCase):
+ remote_name = None
+
+ def WriteTestData(self, root_key):
+ # Set the default value for this key.
+ SetValue(root_key, test_key_name, REG_SZ, "Default value")
+ key = CreateKey(root_key, test_key_name)
+ # Create a sub-key
+ sub_key = CreateKey(key, "sub_key")
+ # Give the sub-key some named values
+
+ for value_name, value_data, value_type in test_data:
+ SetValueEx(sub_key, value_name, 0, value_type, value_data)
+
+ # Check we wrote as many items as we thought.
+ nkeys, nvalues, since_mod = QueryInfoKey(key)
+ self.assertEquals(nkeys, 1, "Not the correct number of sub keys")
+ self.assertEquals(nvalues, 1, "Not the correct number of values")
+ nkeys, nvalues, since_mod = QueryInfoKey(sub_key)
+ self.assertEquals(nkeys, 0, "Not the correct number of sub keys")
+ self.assertEquals(nvalues, len(test_data),
+ "Not the correct number of values")
+ # Close this key this way...
+ # (but before we do, copy the key as an integer - this allows
+ # us to test that the key really gets closed).
+ int_sub_key = int(sub_key)
+ CloseKey(sub_key)
try:
- data = EnumValue(sub_key, index)
+ QueryInfoKey(int_sub_key)
+ self.fail("It appears the CloseKey() function does "
+ "not close the actual key!")
except EnvironmentError:
- break
- verify(data in test_data, "Didn't read back the correct test data")
- index = index + 1
- verify(index==len(test_data), "Didn't read the correct number of items")
- # Check I can directly access each item
- for value_name, value_data, value_type in test_data:
- read_val, read_typ = QueryValueEx(sub_key, value_name)
- verify(read_val==value_data and read_typ == value_type, \
- "Could not directly read the value" )
- sub_key.Close()
- # Enumerate our main key.
- read_val = EnumKey(key, 0)
- verify(read_val == "sub_key", "Read subkey value wrong")
- try:
- EnumKey(key, 1)
- verify(0, "Was able to get a second key when I only have one!")
- except EnvironmentError:
- pass
-
- key.Close()
-
-def DeleteTestData(root_key):
- key = OpenKey(root_key, test_key_name, 0, KEY_ALL_ACCESS)
- sub_key = OpenKey(key, "sub_key", 0, KEY_ALL_ACCESS)
- # It is not necessary to delete the values before deleting
- # the key (although subkeys must not exist). We delete them
- # manually just to prove we can :-)
- for value_name, value_data, value_type in test_data:
- DeleteValue(sub_key, value_name)
-
- nkeys, nvalues, since_mod = QueryInfoKey(sub_key)
- verify(nkeys==0 and nvalues==0, "subkey not empty before delete")
- sub_key.Close()
- DeleteKey(key, "sub_key")
+ pass
+ # ... and close that key that way :-)
+ int_key = int(key)
+ key.Close()
+ try:
+ QueryInfoKey(int_key)
+ self.fail("It appears the key.Close() function "
+ "does not close the actual key!")
+ except EnvironmentError:
+ pass
+
+ def ReadTestData(self, root_key):
+ # Check we can get default value for this key.
+ val = QueryValue(root_key, test_key_name)
+ self.assertEquals(val, "Default value",
+ "Registry didn't give back the correct value")
- try:
- # Shouldnt be able to delete it twice!
- DeleteKey(key, "sub_key")
- verify(0, "Deleting the key twice succeeded")
- except EnvironmentError:
- pass
- key.Close()
- DeleteKey(root_key, test_key_name)
- # Opening should now fail!
- try:
key = OpenKey(root_key, test_key_name)
- verify(0, "Could open the non-existent key")
- except WindowsError: # Use this error name this time
- pass
-
-def TestAll(root_key):
- WriteTestData(root_key)
- ReadTestData(root_key)
- DeleteTestData(root_key)
-
-# Test on my local machine.
-TestAll(HKEY_CURRENT_USER)
-print("Local registry tests worked")
-try:
- remote_name = sys.argv[sys.argv.index("--remote")+1]
-except (IndexError, ValueError):
- remote_name = None
+ # Read the sub-keys
+ sub_key = OpenKey(key, "sub_key")
+ # Check I can enumerate over the values.
+ index = 0
+ while 1:
+ try:
+ data = EnumValue(sub_key, index)
+ except EnvironmentError:
+ break
+ self.assertEquals(data in test_data, True,
+ "Didn't read back the correct test data")
+ index = index + 1
+ self.assertEquals(index, len(test_data),
+ "Didn't read the correct number of items")
+ # Check I can directly access each item
+ for value_name, value_data, value_type in test_data:
+ read_val, read_typ = QueryValueEx(sub_key, value_name)
+ self.assertEquals(read_val, value_data,
+ "Could not directly read the value")
+ self.assertEquals(read_typ, value_type,
+ "Could not directly read the value")
+ sub_key.Close()
+ # Enumerate our main key.
+ read_val = EnumKey(key, 0)
+ self.assertEquals(read_val, "sub_key", "Read subkey value wrong")
+ try:
+ EnumKey(key, 1)
+ self.fail("Was able to get a second key when I only have one!")
+ except EnvironmentError:
+ pass
+
+ key.Close()
+
+ def DeleteTestData(self, root_key):
+ key = OpenKey(root_key, test_key_name, 0, KEY_ALL_ACCESS)
+ sub_key = OpenKey(key, "sub_key", 0, KEY_ALL_ACCESS)
+ # It is not necessary to delete the values before deleting
+ # the key (although subkeys must not exist). We delete them
+ # manually just to prove we can :-)
+ for value_name, value_data, value_type in test_data:
+ DeleteValue(sub_key, value_name)
+
+ nkeys, nvalues, since_mod = QueryInfoKey(sub_key)
+ self.assertEquals(nkeys, 0, "subkey not empty before delete")
+ self.assertEquals(nvalues, 0, "subkey not empty before delete")
+ sub_key.Close()
+ DeleteKey(key, "sub_key")
-if remote_name is not None:
+ try:
+ # Shouldnt be able to delete it twice!
+ DeleteKey(key, "sub_key")
+ self.fail("Deleting the key twice succeeded")
+ except EnvironmentError:
+ pass
+ key.Close()
+ DeleteKey(root_key, test_key_name)
+ # Opening should now fail!
+ try:
+ key = OpenKey(root_key, test_key_name)
+ self.fail("Could open the non-existent key")
+ except WindowsError: # Use this error name this time
+ pass
+
+ def TestAll(self, root_key):
+ self.WriteTestData(root_key)
+ self.ReadTestData(root_key)
+ self.DeleteTestData(root_key)
+
+ def testLocalMachineRegistryWorks(self):
+ self.TestAll(HKEY_CURRENT_USER)
+
+ def testConnectRegistryToLocalMachineWorks(self):
+ # perform minimal ConnectRegistry test which just invokes it
+ h = ConnectRegistry(None, HKEY_LOCAL_MACHINE)
+ h.Close()
+
+ def testRemoteMachineRegistryWorks(self):
+ if not self.remote_name:
+ raise test_support.TestSkipped("Remote machine name "
+ "not specified.")
+ remote_key = ConnectRegistry(self.remote_name, HKEY_CURRENT_USER)
+ self.TestAll(remote_key)
+
+def test_main():
+ test_support.run_unittest(WinregTests)
+
+if __name__ == "__main__":
try:
- remote_key = ConnectRegistry(remote_name, HKEY_CURRENT_USER)
- except EnvironmentError as exc:
- print("Could not connect to the remote machine -", exc.strerror)
- remote_key = None
- if remote_key is not None:
- TestAll(remote_key)
- print("Remote registry tests worked")
-else:
- print("Remote registry calls can be tested using", end=' ')
- print("'test_winreg.py --remote \\\\machine_name'")
- # perform minimal ConnectRegistry test which just invokes it
- h = ConnectRegistry(None, HKEY_LOCAL_MACHINE)
- h.Close()
+ WinregTests.remote_name = sys.argv[sys.argv.index("--remote")+1]
+ except (IndexError, ValueError):
+ print("Remote registry calls can be tested using",
+ "'test_winreg.py --remote \\\\machine_name'")
+ WinregTests.remote_name = None
+ test_main()
diff --git a/Lib/test/test_xmlrpc.py b/Lib/test/test_xmlrpc.py
index ef61c55..07c7ad6 100644
--- a/Lib/test/test_xmlrpc.py
+++ b/Lib/test/test_xmlrpc.py
@@ -284,6 +284,27 @@ def http_server(evt, numrequests):
evt.set()
+def is_unavailable_exception(e):
+ '''Returns True if the given ProtocolError is the product of a server-side
+ exception caused by the 'temporarily unavailable' response sometimes
+ given by operations on non-blocking sockets.'''
+ # sometimes we get a -1 error code and/or empty headers
+ if e.errcode == -1 or e.headers is None:
+ return True
+
+ exc_mess = e.headers.get('X-exception')
+ if exc_mess and 'temporarily unavailable' in exc_mess.lower():
+ return True
+
+ return False
+
+# NOTE: The tests in SimpleServerTestCase will ignore failures caused by
+# "temporarily unavailable" exceptions raised in SimpleXMLRPCServer. This
+# condition occurs infrequently on some platforms, frequently on others, and
+# is apparently caused by using SimpleXMLRPCServer with a non-blocking socket.
+# If the server class is updated at some point in the future to handle this
+# situation more gracefully, these tests should be modified appropriately.
+
class SimpleServerTestCase(unittest.TestCase):
def setUp(self):
# enable traceback reporting
@@ -291,7 +312,7 @@ class SimpleServerTestCase(unittest.TestCase):
self.evt = threading.Event()
# start server thread to handle requests
- serv_args = (self.evt, 2)
+ serv_args = (self.evt, 1)
threading.Thread(target=http_server, args=serv_args).start()
# wait for port to be assigned to server
@@ -314,8 +335,10 @@ class SimpleServerTestCase(unittest.TestCase):
p = xmlrpclib.ServerProxy('http://localhost:%d' % PORT)
self.assertEqual(p.pow(6,8), 6**8)
except xmlrpclib.ProtocolError as e:
- # protocol error; provide additional information in test output
- self.fail("%s\n%s" % (e, e.headers))
+ # ignore failures due to non-blocking socket 'unavailable' errors
+ if not is_unavailable_exception(e):
+ # protocol error; provide additional information in test output
+ self.fail("%s\n%s" % (e, e.headers))
def test_introspection1(self):
try:
@@ -325,8 +348,10 @@ class SimpleServerTestCase(unittest.TestCase):
'system.methodHelp', 'system.methodSignature', 'system.multicall'])
self.assertEqual(set(meth), expected_methods)
except xmlrpclib.ProtocolError as e:
- # protocol error; provide additional information in test output
- self.fail("%s\n%s" % (e, e.headers))
+ # ignore failures due to non-blocking socket 'unavailable' errors
+ if not is_unavailable_exception(e):
+ # protocol error; provide additional information in test output
+ self.fail("%s\n%s" % (e, e.headers))
def test_introspection2(self):
try:
@@ -334,19 +359,23 @@ class SimpleServerTestCase(unittest.TestCase):
divhelp = p.system.methodHelp('div')
self.assertEqual(divhelp, 'This is the div function')
except xmlrpclib.ProtocolError as e:
- # protocol error; provide additional information in test output
- self.fail("%s\n%s" % (e, e.headers))
+ # ignore failures due to non-blocking socket 'unavailable' errors
+ if not is_unavailable_exception(e):
+ # protocol error; provide additional information in test output
+ self.fail("%s\n%s" % (e, e.headers))
def test_introspection3(self):
# the SimpleXMLRPCServer doesn't support signatures, but
- # at least check that we can try
+ # at least check that we can try making the call
try:
p = xmlrpclib.ServerProxy('http://localhost:%d' % PORT)
divsig = p.system.methodSignature('div')
self.assertEqual(divsig, 'signatures not supported')
except xmlrpclib.ProtocolError as e:
- # protocol error; provide additional information in test output
- self.fail("%s\n%s" % (e, e.headers))
+ # ignore failures due to non-blocking socket 'unavailable' errors
+ if not is_unavailable_exception(e):
+ # protocol error; provide additional information in test output
+ self.fail("%s\n%s" % (e, e.headers))
def test_multicall(self):
try:
@@ -360,8 +389,10 @@ class SimpleServerTestCase(unittest.TestCase):
self.assertEqual(pow_result, 6**8)
self.assertEqual(div_result, 127//42)
except xmlrpclib.ProtocolError as e:
- # protocol error; provide additional information in test output
- self.fail("%s\n%s" % (e, e.headers))
+ # ignore failures due to non-blocking socket 'unavailable' errors
+ if not is_unavailable_exception(e):
+ # protocol error; provide additional information in test output
+ self.fail("%s\n%s" % (e, e.headers))
# This is a contrived way to make a failure occur on the server side