summaryrefslogtreecommitdiffstats
path: root/Lib/test
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test')
-rw-r--r--Lib/test/exception_hierarchy.txt3
-rw-r--r--Lib/test/output/test_logging6
-rw-r--r--Lib/test/pickletester.py8
-rwxr-xr-xLib/test/regrtest.py30
-rw-r--r--Lib/test/string_tests.py307
-rw-r--r--Lib/test/test_bigmem.py964
-rw-r--r--Lib/test/test_builtin.py189
-rw-r--r--Lib/test/test_cmd_line.py39
-rw-r--r--Lib/test/test_codeccallbacks.py73
-rw-r--r--Lib/test/test_codecencodings_cn.py1
-rw-r--r--Lib/test/test_codecencodings_hk.py1
-rw-r--r--Lib/test/test_codecencodings_jp.py1
-rw-r--r--Lib/test/test_codecencodings_kr.py1
-rw-r--r--Lib/test/test_codecencodings_tw.py1
-rw-r--r--Lib/test/test_codecmaps_cn.py1
-rw-r--r--Lib/test/test_codecmaps_hk.py1
-rw-r--r--Lib/test/test_codecmaps_jp.py1
-rw-r--r--Lib/test/test_codecmaps_kr.py1
-rw-r--r--Lib/test/test_codecmaps_tw.py1
-rw-r--r--Lib/test/test_compiler.py4
-rw-r--r--Lib/test/test_contextlib.py31
-rw-r--r--Lib/test/test_cookielib.py16
-rw-r--r--Lib/test/test_datetime.py6
-rw-r--r--Lib/test/test_doctest.py39
-rw-r--r--Lib/test/test_exceptions.py93
-rw-r--r--Lib/test/test_file.py2
-rwxr-xr-xLib/test/test_grp.py5
-rw-r--r--Lib/test/test_import.py17
-rw-r--r--Lib/test/test_importhooks.py11
-rw-r--r--Lib/test/test_locale.py28
-rw-r--r--Lib/test/test_logging.py19
-rw-r--r--Lib/test/test_mailbox.py1676
-rw-r--r--Lib/test/test_multibytecodec.py1
-rw-r--r--Lib/test/test_old_mailbox.py120
-rw-r--r--Lib/test/test_optparse.py200
-rw-r--r--Lib/test/test_os.py30
-rw-r--r--Lib/test/test_pty.py13
-rw-r--r--Lib/test/test_rfc822.py10
-rw-r--r--Lib/test/test_setuptools.py16
-rw-r--r--Lib/test/test_shutil.py4
-rw-r--r--Lib/test/test_socket.py33
-rw-r--r--Lib/test/test_sqlite.py5
-rw-r--r--Lib/test/test_stringprep.py6
-rw-r--r--Lib/test/test_struct.py103
-rw-r--r--Lib/test/test_subprocess.py4
-rw-r--r--Lib/test/test_sundry.py6
-rw-r--r--Lib/test/test_support.py106
-rw-r--r--Lib/test/test_syntax.py58
-rw-r--r--Lib/test/test_tarfile.py118
-rw-r--r--Lib/test/test_tcl.py1
-rw-r--r--Lib/test/test_threaded_import.py21
-rw-r--r--Lib/test/test_traceback.py6
-rw-r--r--Lib/test/test_unicode.py15
-rw-r--r--Lib/test/test_unicodedata.py10
-rw-r--r--Lib/test/test_urllib2.py372
-rw-r--r--Lib/test/test_urllib2net.py179
-rw-r--r--Lib/test/test_weakref.py44
-rw-r--r--Lib/test/test_with.py84
-rw-r--r--Lib/test/test_zlib.py57
-rw-r--r--Lib/test/testtar.tarbin112640 -> 133120 bytes
-rw-r--r--Lib/test/threaded_import_hangers.py42
61 files changed, 4687 insertions, 553 deletions
diff --git a/Lib/test/exception_hierarchy.txt b/Lib/test/exception_hierarchy.txt
index 9ed92d0..58131d7 100644
--- a/Lib/test/exception_hierarchy.txt
+++ b/Lib/test/exception_hierarchy.txt
@@ -15,6 +15,7 @@ BaseException
| | +-- IOError
| | +-- OSError
| | +-- WindowsError (Windows)
+ | | +-- VMSError (VMS)
| +-- EOFError
| +-- ImportError
| +-- LookupError
@@ -43,4 +44,4 @@ BaseException
+-- SyntaxWarning
+-- UserWarning
+-- FutureWarning
- +-- OverflowWarning [not generated by the interpreter]
+ +-- ImportWarning
diff --git a/Lib/test/output/test_logging b/Lib/test/output/test_logging
index 7be3a3e..c0d6e06 100644
--- a/Lib/test/output/test_logging
+++ b/Lib/test/output/test_logging
@@ -488,12 +488,12 @@ INFO:a.b.c.d:Info 5
-- log_test4 begin ---------------------------------------------------
config0: ok.
config1: ok.
-config2: <class 'exceptions.AttributeError'>
-config3: <class 'exceptions.KeyError'>
+config2: <type 'exceptions.AttributeError'>
+config3: <type 'exceptions.KeyError'>
-- log_test4 end ---------------------------------------------------
-- log_test5 begin ---------------------------------------------------
ERROR:root:just testing
-<class 'exceptions.KeyError'>... Don't panic!
+<type 'exceptions.KeyError'>... Don't panic!
-- log_test5 end ---------------------------------------------------
-- logrecv output begin ---------------------------------------------------
ERR -> CRITICAL: Message 0 (via logrecv.tcp.ERR)
diff --git a/Lib/test/pickletester.py b/Lib/test/pickletester.py
index 85e1dea..5b9da56 100644
--- a/Lib/test/pickletester.py
+++ b/Lib/test/pickletester.py
@@ -4,7 +4,8 @@ import cPickle
import pickletools
import copy_reg
-from test.test_support import TestFailed, have_unicode, TESTFN
+from test.test_support import TestFailed, have_unicode, TESTFN, \
+ run_with_locale
# Tests that try a number of pickle protocols should have a
# for proto in protocols:
@@ -527,6 +528,11 @@ class AbstractPickleTests(unittest.TestCase):
got = self.loads(p)
self.assertEqual(n, got)
+ @run_with_locale('LC_ALL', 'de_DE', 'fr_FR')
+ def test_float_format(self):
+ # make sure that floats are formatted locale independent
+ self.assertEqual(self.dumps(1.2)[0:3], 'F1.')
+
def test_reduce(self):
pass
diff --git a/Lib/test/regrtest.py b/Lib/test/regrtest.py
index 566e54b..86961b0 100755
--- a/Lib/test/regrtest.py
+++ b/Lib/test/regrtest.py
@@ -25,6 +25,7 @@ Command line options:
-N: nocoverdir -- Put coverage files alongside modules
-L: runleaks -- run the leaks(1) command just before exit
-R: huntrleaks -- search for reference leaks (needs debug build, v. slow)
+-M: memlimit -- run very large memory-consuming tests
If non-option arguments are present, they are names for tests to run,
unless -x is given, in which case they are names for tests not to run.
@@ -63,6 +64,19 @@ of times further it is run and 'fname' is the name of the file the
reports are written to. These parameters all have defaults (5, 4 and
"reflog.txt" respectively), so the minimal invocation is '-R ::'.
+-M runs tests that require an exorbitant amount of memory. These tests
+typically try to ascertain containers keep working when containing more than
+2 bilion objects, and only work on 64-bit systems. The passed-in memlimit,
+which is a string in the form of '2.5Gb', determines howmuch memory the
+tests will limit themselves to (but they may go slightly over.) The number
+shouldn't be more memory than the machine has (including swap memory). You
+should also keep in mind that swap memory is generally much, much slower
+than RAM, and setting memlimit to all available RAM or higher will heavily
+tax the machine. On the other hand, it is no use running these tests with a
+limit of less than 2.5Gb, and many require more than 20Gb. Tests that expect
+to use more than memlimit memory will be skipped. The big-memory tests
+generally run very, very long.
+
-u is used to specify which special resource intensive tests to run,
such as those requiring large file support or network connectivity.
The argument is a comma-separated list of words indicating the
@@ -124,6 +138,14 @@ if sys.maxint > 0x7fffffff:
warnings.filterwarnings("ignore", "hex/oct constants", FutureWarning,
"<string>")
+# Ignore ImportWarnings that only occur in the source tree,
+# (because of modules with the same name as source-directories in Modules/)
+for mod in ("ctypes", "gzip", "zipfile", "tarfile", "encodings.zlib_codec",
+ "test.test_zipimport", "test.test_zlib", "test.test_zipfile",
+ "test.test_codecs", "test.string_tests"):
+ warnings.filterwarnings(module=".*%s$" % (mod,),
+ action="ignore", category=ImportWarning)
+
# MacOSX (a.k.a. Darwin) has a default stack size that is too small
# for deeply recursive regular expressions. We see this as crashes in
# the Python test suite when running test_re.py and test_sre.py. The
@@ -180,12 +202,12 @@ def main(tests=None, testdir=None, verbose=0, quiet=False, generate=False,
test_support.record_original_stdout(sys.stdout)
try:
- opts, args = getopt.getopt(sys.argv[1:], 'hvgqxsrf:lu:t:TD:NLR:w',
+ opts, args = getopt.getopt(sys.argv[1:], 'hvgqxsrf:lu:t:TD:NLR:wM:',
['help', 'verbose', 'quiet', 'generate',
'exclude', 'single', 'random', 'fromfile',
'findleaks', 'use=', 'threshold=', 'trace',
'coverdir=', 'nocoverdir', 'runleaks',
- 'huntrleaks=', 'verbose2',
+ 'huntrleaks=', 'verbose2', 'memlimit=',
])
except getopt.error, msg:
usage(2, msg)
@@ -241,6 +263,8 @@ def main(tests=None, testdir=None, verbose=0, quiet=False, generate=False,
huntrleaks[1] = int(huntrleaks[1])
if len(huntrleaks[2]) == 0:
huntrleaks[2] = "reflog.txt"
+ elif o in ('-M', '--memlimit'):
+ test_support.set_memlimit(a)
elif o in ('-u', '--use'):
u = [x.lower() for x in a.split(',')]
for r in u:
@@ -521,6 +545,7 @@ def runtest(test, generate, verbose, quiet, testdir=None, huntrleaks=False):
def cleanup():
import _strptime, linecache, warnings, dircache
import urlparse, urllib, urllib2, mimetypes, doctest
+ import struct
from distutils.dir_util import _path_created
_path_created.clear()
warnings.filters[:] = fs
@@ -537,6 +562,7 @@ def runtest(test, generate, verbose, quiet, testdir=None, huntrleaks=False):
dircache.reset()
linecache.clearcache()
mimetypes._default_mime_types()
+ struct._cache.clear()
doctest.master = None
if indirect_test:
def run_the_test():
diff --git a/Lib/test/string_tests.py b/Lib/test/string_tests.py
index aab98c2..489af20 100644
--- a/Lib/test/string_tests.py
+++ b/Lib/test/string_tests.py
@@ -243,29 +243,72 @@ class CommonTest(unittest.TestCase):
self.checkequal(['a', 'b', 'c d'], 'a b c d', 'split', None, 2)
self.checkequal(['a', 'b', 'c', 'd'], 'a b c d', 'split', None, 3)
self.checkequal(['a', 'b', 'c', 'd'], 'a b c d', 'split', None, 4)
+ self.checkequal(['a', 'b', 'c', 'd'], 'a b c d', 'split', None,
+ sys.maxint-1)
self.checkequal(['a b c d'], 'a b c d', 'split', None, 0)
+ self.checkequal(['a b c d'], ' a b c d', 'split', None, 0)
self.checkequal(['a', 'b', 'c d'], 'a b c d', 'split', None, 2)
+ self.checkequal([], ' ', 'split')
+ self.checkequal(['a'], ' a ', 'split')
+ self.checkequal(['a', 'b'], ' a b ', 'split')
+ self.checkequal(['a', 'b '], ' a b ', 'split', None, 1)
+ self.checkequal(['a', 'b c '], ' a b c ', 'split', None, 1)
+ self.checkequal(['a', 'b', 'c '], ' a b c ', 'split', None, 2)
+ self.checkequal(['a', 'b'], '\n\ta \t\r b \v ', 'split')
+ aaa = ' a '*20
+ self.checkequal(['a']*20, aaa, 'split')
+ self.checkequal(['a'] + [aaa[4:]], aaa, 'split', None, 1)
+ self.checkequal(['a']*19 + ['a '], aaa, 'split', None, 19)
+
# by a char
self.checkequal(['a', 'b', 'c', 'd'], 'a|b|c|d', 'split', '|')
+ self.checkequal(['a|b|c|d'], 'a|b|c|d', 'split', '|', 0)
self.checkequal(['a', 'b|c|d'], 'a|b|c|d', 'split', '|', 1)
self.checkequal(['a', 'b', 'c|d'], 'a|b|c|d', 'split', '|', 2)
self.checkequal(['a', 'b', 'c', 'd'], 'a|b|c|d', 'split', '|', 3)
self.checkequal(['a', 'b', 'c', 'd'], 'a|b|c|d', 'split', '|', 4)
+ self.checkequal(['a', 'b', 'c', 'd'], 'a|b|c|d', 'split', '|',
+ sys.maxint-2)
self.checkequal(['a|b|c|d'], 'a|b|c|d', 'split', '|', 0)
self.checkequal(['a', '', 'b||c||d'], 'a||b||c||d', 'split', '|', 2)
self.checkequal(['endcase ', ''], 'endcase |', 'split', '|')
+ self.checkequal(['', ' startcase'], '| startcase', 'split', '|')
+ self.checkequal(['', 'bothcase', ''], '|bothcase|', 'split', '|')
self.checkequal(['a', '', 'b\x00c\x00d'], 'a\x00\x00b\x00c\x00d', 'split', '\x00', 2)
+ self.checkequal(['a']*20, ('a|'*20)[:-1], 'split', '|')
+ self.checkequal(['a']*15 +['a|a|a|a|a'],
+ ('a|'*20)[:-1], 'split', '|', 15)
+
# by string
self.checkequal(['a', 'b', 'c', 'd'], 'a//b//c//d', 'split', '//')
self.checkequal(['a', 'b//c//d'], 'a//b//c//d', 'split', '//', 1)
self.checkequal(['a', 'b', 'c//d'], 'a//b//c//d', 'split', '//', 2)
self.checkequal(['a', 'b', 'c', 'd'], 'a//b//c//d', 'split', '//', 3)
self.checkequal(['a', 'b', 'c', 'd'], 'a//b//c//d', 'split', '//', 4)
+ self.checkequal(['a', 'b', 'c', 'd'], 'a//b//c//d', 'split', '//',
+ sys.maxint-10)
self.checkequal(['a//b//c//d'], 'a//b//c//d', 'split', '//', 0)
self.checkequal(['a', '', 'b////c////d'], 'a////b////c////d', 'split', '//', 2)
self.checkequal(['endcase ', ''], 'endcase test', 'split', 'test')
+ self.checkequal(['', ' begincase'], 'test begincase', 'split', 'test')
+ self.checkequal(['', ' bothcase ', ''], 'test bothcase test',
+ 'split', 'test')
+ self.checkequal(['a', 'bc'], 'abbbc', 'split', 'bb')
+ self.checkequal(['', ''], 'aaa', 'split', 'aaa')
+ self.checkequal(['aaa'], 'aaa', 'split', 'aaa', 0)
+ self.checkequal(['ab', 'ab'], 'abbaab', 'split', 'ba')
+ self.checkequal(['aaaa'], 'aaaa', 'split', 'aab')
+ self.checkequal([''], '', 'split', 'aaa')
+ self.checkequal(['aa'], 'aa', 'split', 'aaa')
+ self.checkequal(['A', 'bobb'], 'Abbobbbobb', 'split', 'bbobb')
+ self.checkequal(['A', 'B', ''], 'AbbobbBbbobb', 'split', 'bbobb')
+
+ self.checkequal(['a']*20, ('aBLAH'*20)[:-4], 'split', 'BLAH')
+ self.checkequal(['a']*20, ('aBLAH'*20)[:-4], 'split', 'BLAH', 19)
+ self.checkequal(['a']*18 + ['aBLAHa'], ('aBLAH'*20)[:-4],
+ 'split', 'BLAH', 18)
# mixed use of str and unicode
self.checkequal([u'a', u'b', u'c d'], 'a b c d', 'split', u' ', 2)
@@ -273,6 +316,10 @@ class CommonTest(unittest.TestCase):
# argument type
self.checkraises(TypeError, 'hello', 'split', 42, 42, 42)
+ # null case
+ self.checkraises(ValueError, 'hello', 'split', '')
+ self.checkraises(ValueError, 'hello', 'split', '', 0)
+
def test_rsplit(self):
self.checkequal(['this', 'is', 'the', 'rsplit', 'function'],
'this is the rsplit function', 'rsplit')
@@ -283,29 +330,75 @@ class CommonTest(unittest.TestCase):
self.checkequal(['a b', 'c', 'd'], 'a b c d', 'rsplit', None, 2)
self.checkequal(['a', 'b', 'c', 'd'], 'a b c d', 'rsplit', None, 3)
self.checkequal(['a', 'b', 'c', 'd'], 'a b c d', 'rsplit', None, 4)
+ self.checkequal(['a', 'b', 'c', 'd'], 'a b c d', 'rsplit', None,
+ sys.maxint-20)
self.checkequal(['a b c d'], 'a b c d', 'rsplit', None, 0)
+ self.checkequal(['a b c d'], 'a b c d ', 'rsplit', None, 0)
self.checkequal(['a b', 'c', 'd'], 'a b c d', 'rsplit', None, 2)
+ self.checkequal([], ' ', 'rsplit')
+ self.checkequal(['a'], ' a ', 'rsplit')
+ self.checkequal(['a', 'b'], ' a b ', 'rsplit')
+ self.checkequal([' a', 'b'], ' a b ', 'rsplit', None, 1)
+ self.checkequal([' a b','c'], ' a b c ', 'rsplit',
+ None, 1)
+ self.checkequal([' a', 'b', 'c'], ' a b c ', 'rsplit',
+ None, 2)
+ self.checkequal(['a', 'b'], '\n\ta \t\r b \v ', 'rsplit', None, 88)
+ aaa = ' a '*20
+ self.checkequal(['a']*20, aaa, 'rsplit')
+ self.checkequal([aaa[:-4]] + ['a'], aaa, 'rsplit', None, 1)
+ self.checkequal([' a a'] + ['a']*18, aaa, 'rsplit', None, 18)
+
+
# by a char
self.checkequal(['a', 'b', 'c', 'd'], 'a|b|c|d', 'rsplit', '|')
self.checkequal(['a|b|c', 'd'], 'a|b|c|d', 'rsplit', '|', 1)
self.checkequal(['a|b', 'c', 'd'], 'a|b|c|d', 'rsplit', '|', 2)
self.checkequal(['a', 'b', 'c', 'd'], 'a|b|c|d', 'rsplit', '|', 3)
self.checkequal(['a', 'b', 'c', 'd'], 'a|b|c|d', 'rsplit', '|', 4)
+ self.checkequal(['a', 'b', 'c', 'd'], 'a|b|c|d', 'rsplit', '|',
+ sys.maxint-100)
self.checkequal(['a|b|c|d'], 'a|b|c|d', 'rsplit', '|', 0)
self.checkequal(['a||b||c', '', 'd'], 'a||b||c||d', 'rsplit', '|', 2)
self.checkequal(['', ' begincase'], '| begincase', 'rsplit', '|')
+ self.checkequal(['endcase ', ''], 'endcase |', 'rsplit', '|')
+ self.checkequal(['', 'bothcase', ''], '|bothcase|', 'rsplit', '|')
+
self.checkequal(['a\x00\x00b', 'c', 'd'], 'a\x00\x00b\x00c\x00d', 'rsplit', '\x00', 2)
+ self.checkequal(['a']*20, ('a|'*20)[:-1], 'rsplit', '|')
+ self.checkequal(['a|a|a|a|a']+['a']*15,
+ ('a|'*20)[:-1], 'rsplit', '|', 15)
+
# by string
self.checkequal(['a', 'b', 'c', 'd'], 'a//b//c//d', 'rsplit', '//')
self.checkequal(['a//b//c', 'd'], 'a//b//c//d', 'rsplit', '//', 1)
self.checkequal(['a//b', 'c', 'd'], 'a//b//c//d', 'rsplit', '//', 2)
self.checkequal(['a', 'b', 'c', 'd'], 'a//b//c//d', 'rsplit', '//', 3)
self.checkequal(['a', 'b', 'c', 'd'], 'a//b//c//d', 'rsplit', '//', 4)
+ self.checkequal(['a', 'b', 'c', 'd'], 'a//b//c//d', 'rsplit', '//',
+ sys.maxint-5)
self.checkequal(['a//b//c//d'], 'a//b//c//d', 'rsplit', '//', 0)
self.checkequal(['a////b////c', '', 'd'], 'a////b////c////d', 'rsplit', '//', 2)
self.checkequal(['', ' begincase'], 'test begincase', 'rsplit', 'test')
+ self.checkequal(['endcase ', ''], 'endcase test', 'rsplit', 'test')
+ self.checkequal(['', ' bothcase ', ''], 'test bothcase test',
+ 'rsplit', 'test')
+ self.checkequal(['ab', 'c'], 'abbbc', 'rsplit', 'bb')
+ self.checkequal(['', ''], 'aaa', 'rsplit', 'aaa')
+ self.checkequal(['aaa'], 'aaa', 'rsplit', 'aaa', 0)
+ self.checkequal(['ab', 'ab'], 'abbaab', 'rsplit', 'ba')
+ self.checkequal(['aaaa'], 'aaaa', 'rsplit', 'aab')
+ self.checkequal([''], '', 'rsplit', 'aaa')
+ self.checkequal(['aa'], 'aa', 'rsplit', 'aaa')
+ self.checkequal(['bbob', 'A'], 'bbobbbobbA', 'rsplit', 'bbobb')
+ self.checkequal(['', 'B', 'A'], 'bbobbBbbobbA', 'rsplit', 'bbobb')
+
+ self.checkequal(['a']*20, ('aBLAH'*20)[:-4], 'rsplit', 'BLAH')
+ self.checkequal(['a']*20, ('aBLAH'*20)[:-4], 'rsplit', 'BLAH', 19)
+ self.checkequal(['aBLAHa'] + ['a']*18, ('aBLAH'*20)[:-4],
+ 'rsplit', 'BLAH', 18)
# mixed use of str and unicode
self.checkequal([u'a b', u'c', u'd'], 'a b c d', 'rsplit', u' ', 2)
@@ -313,6 +406,10 @@ class CommonTest(unittest.TestCase):
# argument type
self.checkraises(TypeError, 'hello', 'rsplit', 42, 42, 42)
+ # null case
+ self.checkraises(ValueError, 'hello', 'rsplit', '')
+ self.checkraises(ValueError, 'hello', 'rsplit', '', 0)
+
def test_strip(self):
self.checkequal('hello', ' hello ', 'strip')
self.checkequal('hello ', ' hello ', 'lstrip')
@@ -376,6 +473,158 @@ class CommonTest(unittest.TestCase):
self.checkraises(TypeError, 'hello', 'swapcase', 42)
def test_replace(self):
+ EQ = self.checkequal
+
+ # Operations on the empty string
+ EQ("", "", "replace", "", "")
+
+ #EQ("A", "", "replace", "", "A")
+ # That was the correct result; this is the result we actually get
+ # now (for str, but not for unicode):
+ #EQ("", "", "replace", "", "A")
+
+ EQ("", "", "replace", "A", "")
+ EQ("", "", "replace", "A", "A")
+ EQ("", "", "replace", "", "", 100)
+ EQ("", "", "replace", "", "", sys.maxint)
+
+ # interleave (from=="", 'to' gets inserted everywhere)
+ EQ("A", "A", "replace", "", "")
+ EQ("*A*", "A", "replace", "", "*")
+ EQ("*1A*1", "A", "replace", "", "*1")
+ EQ("*-#A*-#", "A", "replace", "", "*-#")
+ EQ("*-A*-A*-", "AA", "replace", "", "*-")
+ EQ("*-A*-A*-", "AA", "replace", "", "*-", -1)
+ EQ("*-A*-A*-", "AA", "replace", "", "*-", sys.maxint)
+ EQ("*-A*-A*-", "AA", "replace", "", "*-", 4)
+ EQ("*-A*-A*-", "AA", "replace", "", "*-", 3)
+ EQ("*-A*-A", "AA", "replace", "", "*-", 2)
+ EQ("*-AA", "AA", "replace", "", "*-", 1)
+ EQ("AA", "AA", "replace", "", "*-", 0)
+
+ # single character deletion (from=="A", to=="")
+ EQ("", "A", "replace", "A", "")
+ EQ("", "AAA", "replace", "A", "")
+ EQ("", "AAA", "replace", "A", "", -1)
+ EQ("", "AAA", "replace", "A", "", sys.maxint)
+ EQ("", "AAA", "replace", "A", "", 4)
+ EQ("", "AAA", "replace", "A", "", 3)
+ EQ("A", "AAA", "replace", "A", "", 2)
+ EQ("AA", "AAA", "replace", "A", "", 1)
+ EQ("AAA", "AAA", "replace", "A", "", 0)
+ EQ("", "AAAAAAAAAA", "replace", "A", "")
+ EQ("BCD", "ABACADA", "replace", "A", "")
+ EQ("BCD", "ABACADA", "replace", "A", "", -1)
+ EQ("BCD", "ABACADA", "replace", "A", "", sys.maxint)
+ EQ("BCD", "ABACADA", "replace", "A", "", 5)
+ EQ("BCD", "ABACADA", "replace", "A", "", 4)
+ EQ("BCDA", "ABACADA", "replace", "A", "", 3)
+ EQ("BCADA", "ABACADA", "replace", "A", "", 2)
+ EQ("BACADA", "ABACADA", "replace", "A", "", 1)
+ EQ("ABACADA", "ABACADA", "replace", "A", "", 0)
+ EQ("BCD", "ABCAD", "replace", "A", "")
+ EQ("BCD", "ABCADAA", "replace", "A", "")
+ EQ("BCD", "BCD", "replace", "A", "")
+ EQ("*************", "*************", "replace", "A", "")
+ EQ("^A^", "^"+"A"*1000+"^", "replace", "A", "", 999)
+
+ # substring deletion (from=="the", to=="")
+ EQ("", "the", "replace", "the", "")
+ EQ("ater", "theater", "replace", "the", "")
+ EQ("", "thethe", "replace", "the", "")
+ EQ("", "thethethethe", "replace", "the", "")
+ EQ("aaaa", "theatheatheathea", "replace", "the", "")
+ EQ("that", "that", "replace", "the", "")
+ EQ("thaet", "thaet", "replace", "the", "")
+ EQ("here and re", "here and there", "replace", "the", "")
+ EQ("here and re and re", "here and there and there",
+ "replace", "the", "", sys.maxint)
+ EQ("here and re and re", "here and there and there",
+ "replace", "the", "", -1)
+ EQ("here and re and re", "here and there and there",
+ "replace", "the", "", 3)
+ EQ("here and re and re", "here and there and there",
+ "replace", "the", "", 2)
+ EQ("here and re and there", "here and there and there",
+ "replace", "the", "", 1)
+ EQ("here and there and there", "here and there and there",
+ "replace", "the", "", 0)
+ EQ("here and re and re", "here and there and there", "replace", "the", "")
+
+ EQ("abc", "abc", "replace", "the", "")
+ EQ("abcdefg", "abcdefg", "replace", "the", "")
+
+ # substring deletion (from=="bob", to=="")
+ EQ("bob", "bbobob", "replace", "bob", "")
+ EQ("bobXbob", "bbobobXbbobob", "replace", "bob", "")
+ EQ("aaaaaaa", "aaaaaaabob", "replace", "bob", "")
+ EQ("aaaaaaa", "aaaaaaa", "replace", "bob", "")
+
+ # single character replace in place (len(from)==len(to)==1)
+ EQ("Who goes there?", "Who goes there?", "replace", "o", "o")
+ EQ("WhO gOes there?", "Who goes there?", "replace", "o", "O")
+ EQ("WhO gOes there?", "Who goes there?", "replace", "o", "O", sys.maxint)
+ EQ("WhO gOes there?", "Who goes there?", "replace", "o", "O", -1)
+ EQ("WhO gOes there?", "Who goes there?", "replace", "o", "O", 3)
+ EQ("WhO gOes there?", "Who goes there?", "replace", "o", "O", 2)
+ EQ("WhO goes there?", "Who goes there?", "replace", "o", "O", 1)
+ EQ("Who goes there?", "Who goes there?", "replace", "o", "O", 0)
+
+ EQ("Who goes there?", "Who goes there?", "replace", "a", "q")
+ EQ("who goes there?", "Who goes there?", "replace", "W", "w")
+ EQ("wwho goes there?ww", "WWho goes there?WW", "replace", "W", "w")
+ EQ("Who goes there!", "Who goes there?", "replace", "?", "!")
+ EQ("Who goes there!!", "Who goes there??", "replace", "?", "!")
+
+ EQ("Who goes there?", "Who goes there?", "replace", ".", "!")
+
+ # substring replace in place (len(from)==len(to) > 1)
+ EQ("Th** ** a t**sue", "This is a tissue", "replace", "is", "**")
+ EQ("Th** ** a t**sue", "This is a tissue", "replace", "is", "**", sys.maxint)
+ EQ("Th** ** a t**sue", "This is a tissue", "replace", "is", "**", -1)
+ EQ("Th** ** a t**sue", "This is a tissue", "replace", "is", "**", 4)
+ EQ("Th** ** a t**sue", "This is a tissue", "replace", "is", "**", 3)
+ EQ("Th** ** a tissue", "This is a tissue", "replace", "is", "**", 2)
+ EQ("Th** is a tissue", "This is a tissue", "replace", "is", "**", 1)
+ EQ("This is a tissue", "This is a tissue", "replace", "is", "**", 0)
+ EQ("cobob", "bobob", "replace", "bob", "cob")
+ EQ("cobobXcobocob", "bobobXbobobob", "replace", "bob", "cob")
+ EQ("bobob", "bobob", "replace", "bot", "bot")
+
+ # replace single character (len(from)==1, len(to)>1)
+ EQ("ReyKKjaviKK", "Reykjavik", "replace", "k", "KK")
+ EQ("ReyKKjaviKK", "Reykjavik", "replace", "k", "KK", -1)
+ EQ("ReyKKjaviKK", "Reykjavik", "replace", "k", "KK", sys.maxint)
+ EQ("ReyKKjaviKK", "Reykjavik", "replace", "k", "KK", 2)
+ EQ("ReyKKjavik", "Reykjavik", "replace", "k", "KK", 1)
+ EQ("Reykjavik", "Reykjavik", "replace", "k", "KK", 0)
+ EQ("A----B----C----", "A.B.C.", "replace", ".", "----")
+
+ EQ("Reykjavik", "Reykjavik", "replace", "q", "KK")
+
+ # replace substring (len(from)>1, len(to)!=len(from))
+ EQ("ham, ham, eggs and ham", "spam, spam, eggs and spam",
+ "replace", "spam", "ham")
+ EQ("ham, ham, eggs and ham", "spam, spam, eggs and spam",
+ "replace", "spam", "ham", sys.maxint)
+ EQ("ham, ham, eggs and ham", "spam, spam, eggs and spam",
+ "replace", "spam", "ham", -1)
+ EQ("ham, ham, eggs and ham", "spam, spam, eggs and spam",
+ "replace", "spam", "ham", 4)
+ EQ("ham, ham, eggs and ham", "spam, spam, eggs and spam",
+ "replace", "spam", "ham", 3)
+ EQ("ham, ham, eggs and spam", "spam, spam, eggs and spam",
+ "replace", "spam", "ham", 2)
+ EQ("ham, spam, eggs and spam", "spam, spam, eggs and spam",
+ "replace", "spam", "ham", 1)
+ EQ("spam, spam, eggs and spam", "spam, spam, eggs and spam",
+ "replace", "spam", "ham", 0)
+
+ EQ("bobob", "bobobob", "replace", "bobob", "bob")
+ EQ("bobobXbobob", "bobobobXbobobob", "replace", "bobob", "bob")
+ EQ("BOBOBOB", "BOBOBOB", "replace", "bob", "bobby")
+
+ #
self.checkequal('one@two!three!', 'one!two!three!', 'replace', '!', '@', 1)
self.checkequal('onetwothree', 'one!two!three!', 'replace', '!', '')
self.checkequal('one@two@three!', 'one!two!three!', 'replace', '!', '@', 2)
@@ -403,6 +652,15 @@ class CommonTest(unittest.TestCase):
self.checkraises(TypeError, 'hello', 'replace', 42, 'h')
self.checkraises(TypeError, 'hello', 'replace', 'h', 42)
+ def test_replace_overflow(self):
+ # Check for overflow checking on 32 bit machines
+ if sys.maxint != 2147483647:
+ return
+ A2_16 = "A" * (2**16)
+ self.checkraises(OverflowError, A2_16, "replace", "", A2_16)
+ self.checkraises(OverflowError, A2_16, "replace", "A", A2_16)
+ self.checkraises(OverflowError, A2_16, "replace", "AA", A2_16+A2_16)
+
def test_zfill(self):
self.checkequal('123', '123', 'zfill', 2)
self.checkequal('123', '123', 'zfill', 3)
@@ -720,6 +978,55 @@ class MixinStrUnicodeUserStringTest:
else:
self.checkcall(format, "__mod__", value)
+ def test_inplace_rewrites(self):
+ # Check that strings don't copy and modify cached single-character strings
+ self.checkequal('a', 'A', 'lower')
+ self.checkequal(True, 'A', 'isupper')
+ self.checkequal('A', 'a', 'upper')
+ self.checkequal(True, 'a', 'islower')
+
+ self.checkequal('a', 'A', 'replace', 'A', 'a')
+ self.checkequal(True, 'A', 'isupper')
+
+ self.checkequal('A', 'a', 'capitalize')
+ self.checkequal(True, 'a', 'islower')
+
+ self.checkequal('A', 'a', 'swapcase')
+ self.checkequal(True, 'a', 'islower')
+
+ self.checkequal('A', 'a', 'title')
+ self.checkequal(True, 'a', 'islower')
+
+ def test_partition(self):
+
+ self.checkequal(('this is the par', 'ti', 'tion method'),
+ 'this is the partition method', 'partition', 'ti')
+
+ # from raymond's original specification
+ S = 'http://www.python.org'
+ self.checkequal(('http', '://', 'www.python.org'), S, 'partition', '://')
+ self.checkequal(('http://www.python.org', '', ''), S, 'partition', '?')
+ self.checkequal(('', 'http://', 'www.python.org'), S, 'partition', 'http://')
+ self.checkequal(('http://www.python.', 'org', ''), S, 'partition', 'org')
+
+ self.checkraises(ValueError, S, 'partition', '')
+ self.checkraises(TypeError, S, 'partition', None)
+
+ def test_rpartition(self):
+
+ self.checkequal(('this is the rparti', 'ti', 'on method'),
+ 'this is the rpartition method', 'rpartition', 'ti')
+
+ # from raymond's original specification
+ S = 'http://www.python.org'
+ self.checkequal(('http', '://', 'www.python.org'), S, 'rpartition', '://')
+ self.checkequal(('http://www.python.org', '', ''), S, 'rpartition', '?')
+ self.checkequal(('', 'http://', 'www.python.org'), S, 'rpartition', 'http://')
+ self.checkequal(('http://www.python.', 'org', ''), S, 'rpartition', 'org')
+
+ self.checkraises(ValueError, S, 'rpartition', '')
+ self.checkraises(TypeError, S, 'rpartition', None)
+
class MixinStrStringUserStringTest:
# Additional tests for 8bit strings, i.e. str, UserString and
diff --git a/Lib/test/test_bigmem.py b/Lib/test/test_bigmem.py
new file mode 100644
index 0000000..255428f
--- /dev/null
+++ b/Lib/test/test_bigmem.py
@@ -0,0 +1,964 @@
+from test import test_support
+from test.test_support import bigmemtest, _1G, _2G
+
+import unittest
+import operator
+import string
+import sys
+
+# Bigmem testing houserules:
+#
+# - Try not to allocate too many large objects. It's okay to rely on
+# refcounting semantics, but don't forget that 's = create_largestring()'
+# doesn't release the old 's' (if it exists) until well after its new
+# value has been created. Use 'del s' before the create_largestring call.
+#
+# - Do *not* compare large objects using assertEquals or similar. It's a
+# lengty operation and the errormessage will be utterly useless due to
+# its size. To make sure whether a result has the right contents, better
+# to use the strip or count methods, or compare meaningful slices.
+#
+# - Don't forget to test for large indices, offsets and results and such,
+# in addition to large sizes.
+#
+# - When repeating an object (say, a substring, or a small list) to create
+# a large object, make the subobject of a length that is not a power of
+# 2. That way, int-wrapping problems are more easily detected.
+#
+# - While the bigmemtest decorator speaks of 'minsize', all tests will
+# actually be called with a much smaller number too, in the normal
+# test run (5Kb currently.) This is so the tests themselves get frequent
+# testing Consequently, always make all large allocations based on the
+# passed-in 'size', and don't rely on the size being very large. Also,
+# memuse-per-size should remain sane (less than a few thousand); if your
+# test uses more, adjust 'size' upward, instead.
+
+class StrTest(unittest.TestCase):
+ @bigmemtest(minsize=_2G, memuse=2)
+ def test_capitalize(self, size):
+ SUBSTR = ' abc def ghi'
+ s = '-' * size + SUBSTR
+ caps = s.capitalize()
+ self.assertEquals(caps[-len(SUBSTR):],
+ SUBSTR.capitalize())
+ self.assertEquals(caps.lstrip('-'), SUBSTR)
+
+ @bigmemtest(minsize=_2G + 10, memuse=1)
+ def test_center(self, size):
+ SUBSTR = ' abc def ghi'
+ s = SUBSTR.center(size)
+ self.assertEquals(len(s), size)
+ lpadsize = rpadsize = (len(s) - len(SUBSTR)) // 2
+ if len(s) % 2:
+ lpadsize += 1
+ self.assertEquals(s[lpadsize:-rpadsize], SUBSTR)
+ self.assertEquals(s.strip(), SUBSTR.strip())
+
+ @bigmemtest(minsize=_2G, memuse=2)
+ def test_count(self, size):
+ SUBSTR = ' abc def ghi'
+ s = '.' * size + SUBSTR
+ self.assertEquals(s.count('.'), size)
+ s += '.'
+ self.assertEquals(s.count('.'), size + 1)
+ self.assertEquals(s.count(' '), 3)
+ self.assertEquals(s.count('i'), 1)
+ self.assertEquals(s.count('j'), 0)
+
+ @bigmemtest(minsize=0, memuse=1)
+ def test_decode(self, size):
+ pass
+
+ @bigmemtest(minsize=0, memuse=1)
+ def test_encode(self, size):
+ pass
+
+ @bigmemtest(minsize=_2G, memuse=2)
+ def test_endswith(self, size):
+ SUBSTR = ' abc def ghi'
+ s = '-' * size + SUBSTR
+ self.failUnless(s.endswith(SUBSTR))
+ self.failUnless(s.endswith(s))
+ s2 = '...' + s
+ self.failUnless(s2.endswith(s))
+ self.failIf(s.endswith('a' + SUBSTR))
+ self.failIf(SUBSTR.endswith(s))
+
+ @bigmemtest(minsize=_2G + 10, memuse=2)
+ def test_expandtabs(self, size):
+ s = '-' * size
+ tabsize = 8
+ self.assertEquals(s.expandtabs(), s)
+ del s
+ slen, remainder = divmod(size, tabsize)
+ s = ' \t' * slen
+ s = s.expandtabs(tabsize)
+ self.assertEquals(len(s), size - remainder)
+ self.assertEquals(len(s.strip(' ')), 0)
+
+ @bigmemtest(minsize=_2G, memuse=2)
+ def test_find(self, size):
+ SUBSTR = ' abc def ghi'
+ sublen = len(SUBSTR)
+ s = ''.join([SUBSTR, '-' * size, SUBSTR])
+ self.assertEquals(s.find(' '), 0)
+ self.assertEquals(s.find(SUBSTR), 0)
+ self.assertEquals(s.find(' ', sublen), sublen + size)
+ self.assertEquals(s.find(SUBSTR, len(SUBSTR)), sublen + size)
+ self.assertEquals(s.find('i'), SUBSTR.find('i'))
+ self.assertEquals(s.find('i', sublen),
+ sublen + size + SUBSTR.find('i'))
+ self.assertEquals(s.find('i', size),
+ sublen + size + SUBSTR.find('i'))
+ self.assertEquals(s.find('j'), -1)
+
+ @bigmemtest(minsize=_2G, memuse=2)
+ def test_index(self, size):
+ SUBSTR = ' abc def ghi'
+ sublen = len(SUBSTR)
+ s = ''.join([SUBSTR, '-' * size, SUBSTR])
+ self.assertEquals(s.index(' '), 0)
+ self.assertEquals(s.index(SUBSTR), 0)
+ self.assertEquals(s.index(' ', sublen), sublen + size)
+ self.assertEquals(s.index(SUBSTR, sublen), sublen + size)
+ self.assertEquals(s.index('i'), SUBSTR.index('i'))
+ self.assertEquals(s.index('i', sublen),
+ sublen + size + SUBSTR.index('i'))
+ self.assertEquals(s.index('i', size),
+ sublen + size + SUBSTR.index('i'))
+ self.assertRaises(ValueError, s.index, 'j')
+
+ @bigmemtest(minsize=_2G, memuse=2)
+ def test_isalnum(self, size):
+ SUBSTR = '123456'
+ s = 'a' * size + SUBSTR
+ self.failUnless(s.isalnum())
+ s += '.'
+ self.failIf(s.isalnum())
+
+ @bigmemtest(minsize=_2G, memuse=2)
+ def test_isalpha(self, size):
+ SUBSTR = 'zzzzzzz'
+ s = 'a' * size + SUBSTR
+ self.failUnless(s.isalpha())
+ s += '.'
+ self.failIf(s.isalpha())
+
+ @bigmemtest(minsize=_2G, memuse=2)
+ def test_isdigit(self, size):
+ SUBSTR = '123456'
+ s = '9' * size + SUBSTR
+ self.failUnless(s.isdigit())
+ s += 'z'
+ self.failIf(s.isdigit())
+
+ @bigmemtest(minsize=_2G, memuse=2)
+ def test_islower(self, size):
+ chars = ''.join([ chr(c) for c in range(255) if not chr(c).isupper() ])
+ repeats = size // len(chars) + 2
+ s = chars * repeats
+ self.failUnless(s.islower())
+ s += 'A'
+ self.failIf(s.islower())
+
+ @bigmemtest(minsize=_2G, memuse=2)
+ def test_isspace(self, size):
+ whitespace = ' \f\n\r\t\v'
+ repeats = size // len(whitespace) + 2
+ s = whitespace * repeats
+ self.failUnless(s.isspace())
+ s += 'j'
+ self.failIf(s.isspace())
+
+ @bigmemtest(minsize=_2G, memuse=2)
+ def test_istitle(self, size):
+ SUBSTR = '123456'
+ s = ''.join(['A', 'a' * size, SUBSTR])
+ self.failUnless(s.istitle())
+ s += 'A'
+ self.failUnless(s.istitle())
+ s += 'aA'
+ self.failIf(s.istitle())
+
+ @bigmemtest(minsize=_2G, memuse=2)
+ def test_isupper(self, size):
+ chars = ''.join([ chr(c) for c in range(255) if not chr(c).islower() ])
+ repeats = size // len(chars) + 2
+ s = chars * repeats
+ self.failUnless(s.isupper())
+ s += 'a'
+ self.failIf(s.isupper())
+
+ @bigmemtest(minsize=_2G, memuse=2)
+ def test_join(self, size):
+ s = 'A' * size
+ x = s.join(['aaaaa', 'bbbbb'])
+ self.assertEquals(x.count('a'), 5)
+ self.assertEquals(x.count('b'), 5)
+ self.failUnless(x.startswith('aaaaaA'))
+ self.failUnless(x.endswith('Abbbbb'))
+
+ @bigmemtest(minsize=_2G + 10, memuse=1)
+ def test_ljust(self, size):
+ SUBSTR = ' abc def ghi'
+ s = SUBSTR.ljust(size)
+ self.failUnless(s.startswith(SUBSTR + ' '))
+ self.assertEquals(len(s), size)
+ self.assertEquals(s.strip(), SUBSTR.strip())
+
+ @bigmemtest(minsize=_2G + 10, memuse=2)
+ def test_lower(self, size):
+ s = 'A' * size
+ s = s.lower()
+ self.assertEquals(len(s), size)
+ self.assertEquals(s.count('a'), size)
+
+ @bigmemtest(minsize=_2G + 10, memuse=1)
+ def test_lstrip(self, size):
+ SUBSTR = 'abc def ghi'
+ s = SUBSTR.rjust(size)
+ self.assertEquals(len(s), size)
+ self.assertEquals(s.lstrip(), SUBSTR.lstrip())
+ del s
+ s = SUBSTR.ljust(size)
+ self.assertEquals(len(s), size)
+ stripped = s.lstrip()
+ self.failUnless(stripped is s)
+
+ @bigmemtest(minsize=_2G + 10, memuse=2)
+ def test_replace(self, size):
+ replacement = 'a'
+ s = ' ' * size
+ s = s.replace(' ', replacement)
+ self.assertEquals(len(s), size)
+ self.assertEquals(s.count(replacement), size)
+ s = s.replace(replacement, ' ', size - 4)
+ self.assertEquals(len(s), size)
+ self.assertEquals(s.count(replacement), 4)
+ self.assertEquals(s[-10:], ' aaaa')
+
+ @bigmemtest(minsize=_2G, memuse=2)
+ def test_rfind(self, size):
+ SUBSTR = ' abc def ghi'
+ sublen = len(SUBSTR)
+ s = ''.join([SUBSTR, '-' * size, SUBSTR])
+ self.assertEquals(s.rfind(' '), sublen + size + SUBSTR.rfind(' '))
+ self.assertEquals(s.rfind(SUBSTR), sublen + size)
+ self.assertEquals(s.rfind(' ', 0, size), SUBSTR.rfind(' '))
+ self.assertEquals(s.rfind(SUBSTR, 0, sublen + size), 0)
+ self.assertEquals(s.rfind('i'), sublen + size + SUBSTR.rfind('i'))
+ self.assertEquals(s.rfind('i', 0, sublen), SUBSTR.rfind('i'))
+ self.assertEquals(s.rfind('i', 0, sublen + size),
+ SUBSTR.rfind('i'))
+ self.assertEquals(s.rfind('j'), -1)
+
+ @bigmemtest(minsize=_2G, memuse=2)
+ def test_rindex(self, size):
+ SUBSTR = ' abc def ghi'
+ sublen = len(SUBSTR)
+ s = ''.join([SUBSTR, '-' * size, SUBSTR])
+ self.assertEquals(s.rindex(' '),
+ sublen + size + SUBSTR.rindex(' '))
+ self.assertEquals(s.rindex(SUBSTR), sublen + size)
+ self.assertEquals(s.rindex(' ', 0, sublen + size - 1),
+ SUBSTR.rindex(' '))
+ self.assertEquals(s.rindex(SUBSTR, 0, sublen + size), 0)
+ self.assertEquals(s.rindex('i'),
+ sublen + size + SUBSTR.rindex('i'))
+ self.assertEquals(s.rindex('i', 0, sublen), SUBSTR.rindex('i'))
+ self.assertEquals(s.rindex('i', 0, sublen + size),
+ SUBSTR.rindex('i'))
+ self.assertRaises(ValueError, s.rindex, 'j')
+
+ @bigmemtest(minsize=_2G + 10, memuse=1)
+ def test_rjust(self, size):
+ SUBSTR = ' abc def ghi'
+ s = SUBSTR.ljust(size)
+ self.failUnless(s.startswith(SUBSTR + ' '))
+ self.assertEquals(len(s), size)
+ self.assertEquals(s.strip(), SUBSTR.strip())
+
+ @bigmemtest(minsize=_2G + 10, memuse=1)
+ def test_rstrip(self, size):
+ SUBSTR = ' abc def ghi'
+ s = SUBSTR.ljust(size)
+ self.assertEquals(len(s), size)
+ self.assertEquals(s.rstrip(), SUBSTR.rstrip())
+ del s
+ s = SUBSTR.rjust(size)
+ self.assertEquals(len(s), size)
+ stripped = s.rstrip()
+ self.failUnless(stripped is s)
+
+ # The test takes about size bytes to build a string, and then about
+ # sqrt(size) substrings of sqrt(size) in size and a list to
+ # hold sqrt(size) items. It's close but just over 2x size.
+ @bigmemtest(minsize=_2G, memuse=2.1)
+ def test_split_small(self, size):
+ # Crudely calculate an estimate so that the result of s.split won't
+ # take up an inordinate amount of memory
+ chunksize = int(size ** 0.5 + 2)
+ SUBSTR = 'a' + ' ' * chunksize
+ s = SUBSTR * chunksize
+ l = s.split()
+ self.assertEquals(len(l), chunksize)
+ self.assertEquals(set(l), set(['a']))
+ del l
+ l = s.split('a')
+ self.assertEquals(len(l), chunksize + 1)
+ self.assertEquals(set(l), set(['', ' ' * chunksize]))
+
+ # Allocates a string of twice size (and briefly two) and a list of
+ # size. Because of internal affairs, the s.split() call produces a
+ # list of size times the same one-character string, so we only
+ # suffer for the list size. (Otherwise, it'd cost another 48 times
+ # size in bytes!) Nevertheless, a list of size takes
+ # 8*size bytes.
+ @bigmemtest(minsize=_2G + 5, memuse=10)
+ def test_split_large(self, size):
+ s = ' a' * size + ' '
+ l = s.split()
+ self.assertEquals(len(l), size)
+ self.assertEquals(set(l), set(['a']))
+ del l
+ l = s.split('a')
+ self.assertEquals(len(l), size + 1)
+ self.assertEquals(set(l), set([' ']))
+
+ @bigmemtest(minsize=_2G, memuse=2.1)
+ def test_splitlines(self, size):
+ # Crudely calculate an estimate so that the result of s.split won't
+ # take up an inordinate amount of memory
+ chunksize = int(size ** 0.5 + 2) // 2
+ SUBSTR = ' ' * chunksize + '\n' + ' ' * chunksize + '\r\n'
+ s = SUBSTR * chunksize
+ l = s.splitlines()
+ self.assertEquals(len(l), chunksize * 2)
+ self.assertEquals(set(l), set([' ' * chunksize]))
+
+ @bigmemtest(minsize=_2G, memuse=2)
+ def test_startswith(self, size):
+ SUBSTR = ' abc def ghi'
+ s = '-' * size + SUBSTR
+ self.failUnless(s.startswith(s))
+ self.failUnless(s.startswith('-' * size))
+ self.failIf(s.startswith(SUBSTR))
+
+ @bigmemtest(minsize=_2G, memuse=1)
+ def test_strip(self, size):
+ SUBSTR = ' abc def ghi '
+ s = SUBSTR.rjust(size)
+ self.assertEquals(len(s), size)
+ self.assertEquals(s.strip(), SUBSTR.strip())
+ del s
+ s = SUBSTR.ljust(size)
+ self.assertEquals(len(s), size)
+ self.assertEquals(s.strip(), SUBSTR.strip())
+
+ @bigmemtest(minsize=_2G, memuse=2)
+ def test_swapcase(self, size):
+ SUBSTR = "aBcDeFG12.'\xa9\x00"
+ sublen = len(SUBSTR)
+ repeats = size // sublen + 2
+ s = SUBSTR * repeats
+ s = s.swapcase()
+ self.assertEquals(len(s), sublen * repeats)
+ self.assertEquals(s[:sublen * 3], SUBSTR.swapcase() * 3)
+ self.assertEquals(s[-sublen * 3:], SUBSTR.swapcase() * 3)
+
+ @bigmemtest(minsize=_2G, memuse=2)
+ def test_title(self, size):
+ SUBSTR = 'SpaaHAaaAaham'
+ s = SUBSTR * (size // len(SUBSTR) + 2)
+ s = s.title()
+ self.failUnless(s.startswith((SUBSTR * 3).title()))
+ self.failUnless(s.endswith(SUBSTR.lower() * 3))
+
+ @bigmemtest(minsize=_2G, memuse=2)
+ def test_translate(self, size):
+ trans = string.maketrans('.aZ', '-!$')
+ SUBSTR = 'aZz.z.Aaz.'
+ sublen = len(SUBSTR)
+ repeats = size // sublen + 2
+ s = SUBSTR * repeats
+ s = s.translate(trans)
+ self.assertEquals(len(s), repeats * sublen)
+ self.assertEquals(s[:sublen], SUBSTR.translate(trans))
+ self.assertEquals(s[-sublen:], SUBSTR.translate(trans))
+ self.assertEquals(s.count('.'), 0)
+ self.assertEquals(s.count('!'), repeats * 2)
+ self.assertEquals(s.count('z'), repeats * 3)
+
+ @bigmemtest(minsize=_2G + 5, memuse=2)
+ def test_upper(self, size):
+ s = 'a' * size
+ s = s.upper()
+ self.assertEquals(len(s), size)
+ self.assertEquals(s.count('A'), size)
+
+ @bigmemtest(minsize=_2G + 20, memuse=1)
+ def test_zfill(self, size):
+ SUBSTR = '-568324723598234'
+ s = SUBSTR.zfill(size)
+ self.failUnless(s.endswith('0' + SUBSTR[1:]))
+ self.failUnless(s.startswith('-0'))
+ self.assertEquals(len(s), size)
+ self.assertEquals(s.count('0'), size - len(SUBSTR))
+
+ @bigmemtest(minsize=_2G + 10, memuse=2)
+ def test_format(self, size):
+ s = '-' * size
+ sf = '%s' % (s,)
+ self.failUnless(s == sf)
+ del sf
+ sf = '..%s..' % (s,)
+ self.assertEquals(len(sf), len(s) + 4)
+ self.failUnless(sf.startswith('..-'))
+ self.failUnless(sf.endswith('-..'))
+ del s, sf
+
+ size //= 2
+ edge = '-' * size
+ s = ''.join([edge, '%s', edge])
+ del edge
+ s = s % '...'
+ self.assertEquals(len(s), size * 2 + 3)
+ self.assertEquals(s.count('.'), 3)
+ self.assertEquals(s.count('-'), size * 2)
+
+ @bigmemtest(minsize=_2G + 10, memuse=2)
+ def test_repr_small(self, size):
+ s = '-' * size
+ s = repr(s)
+ self.assertEquals(len(s), size + 2)
+ self.assertEquals(s[0], "'")
+ self.assertEquals(s[-1], "'")
+ self.assertEquals(s.count('-'), size)
+ del s
+ # repr() will create a string four times as large as this 'binary
+ # string', but we don't want to allocate much more than twice
+ # size in total. (We do extra testing in test_repr_large())
+ size = size // 5 * 2
+ s = '\x00' * size
+ s = repr(s)
+ self.assertEquals(len(s), size * 4 + 2)
+ self.assertEquals(s[0], "'")
+ self.assertEquals(s[-1], "'")
+ self.assertEquals(s.count('\\'), size)
+ self.assertEquals(s.count('0'), size * 2)
+
+ @bigmemtest(minsize=_2G + 10, memuse=5)
+ def test_repr_large(self, size):
+ s = '\x00' * size
+ s = repr(s)
+ self.assertEquals(len(s), size * 4 + 2)
+ self.assertEquals(s[0], "'")
+ self.assertEquals(s[-1], "'")
+ self.assertEquals(s.count('\\'), size)
+ self.assertEquals(s.count('0'), size * 2)
+
+ # This test is meaningful even with size < 2G, as long as the
+ # doubled string is > 2G (but it tests more if both are > 2G :)
+ @bigmemtest(minsize=_1G + 2, memuse=3)
+ def test_concat(self, size):
+ s = '.' * size
+ self.assertEquals(len(s), size)
+ s = s + s
+ self.assertEquals(len(s), size * 2)
+ self.assertEquals(s.count('.'), size * 2)
+
+ # This test is meaningful even with size < 2G, as long as the
+ # repeated string is > 2G (but it tests more if both are > 2G :)
+ @bigmemtest(minsize=_1G + 2, memuse=3)
+ def test_repeat(self, size):
+ s = '.' * size
+ self.assertEquals(len(s), size)
+ s = s * 2
+ self.assertEquals(len(s), size * 2)
+ self.assertEquals(s.count('.'), size * 2)
+
+ @bigmemtest(minsize=_2G + 20, memuse=1)
+ def test_slice_and_getitem(self, size):
+ SUBSTR = '0123456789'
+ sublen = len(SUBSTR)
+ s = SUBSTR * (size // sublen)
+ stepsize = len(s) // 100
+ stepsize = stepsize - (stepsize % sublen)
+ for i in range(0, len(s) - stepsize, stepsize):
+ self.assertEquals(s[i], SUBSTR[0])
+ self.assertEquals(s[i:i + sublen], SUBSTR)
+ self.assertEquals(s[i:i + sublen:2], SUBSTR[::2])
+ if i > 0:
+ self.assertEquals(s[i + sublen - 1:i - 1:-3],
+ SUBSTR[sublen::-3])
+ # Make sure we do some slicing and indexing near the end of the
+ # string, too.
+ self.assertEquals(s[len(s) - 1], SUBSTR[-1])
+ self.assertEquals(s[-1], SUBSTR[-1])
+ self.assertEquals(s[len(s) - 10], SUBSTR[0])
+ self.assertEquals(s[-sublen], SUBSTR[0])
+ self.assertEquals(s[len(s):], '')
+ self.assertEquals(s[len(s) - 1:], SUBSTR[-1])
+ self.assertEquals(s[-1:], SUBSTR[-1])
+ self.assertEquals(s[len(s) - sublen:], SUBSTR)
+ self.assertEquals(s[-sublen:], SUBSTR)
+ self.assertEquals(len(s[:]), len(s))
+ self.assertEquals(len(s[:len(s) - 5]), len(s) - 5)
+ self.assertEquals(len(s[5:-5]), len(s) - 10)
+
+ self.assertRaises(IndexError, operator.getitem, s, len(s))
+ self.assertRaises(IndexError, operator.getitem, s, len(s) + 1)
+ self.assertRaises(IndexError, operator.getitem, s, len(s) + 1<<31)
+
+ @bigmemtest(minsize=_2G, memuse=2)
+ def test_contains(self, size):
+ SUBSTR = '0123456789'
+ edge = '-' * (size // 2)
+ s = ''.join([edge, SUBSTR, edge])
+ del edge
+ self.failUnless(SUBSTR in s)
+ self.failIf(SUBSTR * 2 in s)
+ self.failUnless('-' in s)
+ self.failIf('a' in s)
+ s += 'a'
+ self.failUnless('a' in s)
+
+ @bigmemtest(minsize=_2G + 10, memuse=2)
+ def test_compare(self, size):
+ s1 = '-' * size
+ s2 = '-' * size
+ self.failUnless(s1 == s2)
+ del s2
+ s2 = s1 + 'a'
+ self.failIf(s1 == s2)
+ del s2
+ s2 = '.' * size
+ self.failIf(s1 == s2)
+
+ @bigmemtest(minsize=_2G + 10, memuse=1)
+ def test_hash(self, size):
+ # Not sure if we can do any meaningful tests here... Even if we
+ # start relying on the exact algorithm used, the result will be
+ # different depending on the size of the C 'long int'. Even this
+ # test is dodgy (there's no *guarantee* that the two things should
+ # have a different hash, even if they, in the current
+ # implementation, almost always do.)
+ s = '\x00' * size
+ h1 = hash(s)
+ del s
+ s = '\x00' * (size + 1)
+ self.failIf(h1 == hash(s))
+
+class TupleTest(unittest.TestCase):
+
+ # Tuples have a small, fixed-sized head and an array of pointers to
+ # data. Since we're testing 64-bit addressing, we can assume that the
+ # pointers are 8 bytes, and that thus that the tuples take up 8 bytes
+ # per size.
+
+ # As a side-effect of testing long tuples, these tests happen to test
+ # having more than 2<<31 references to any given object. Hence the
+ # use of different types of objects as contents in different tests.
+
+ @bigmemtest(minsize=_2G + 2, memuse=16)
+ def test_compare(self, size):
+ t1 = (u'',) * size
+ t2 = (u'',) * size
+ self.failUnless(t1 == t2)
+ del t2
+ t2 = (u'',) * (size + 1)
+ self.failIf(t1 == t2)
+ del t2
+ t2 = (1,) * size
+ self.failIf(t1 == t2)
+
+ # Test concatenating into a single tuple of more than 2G in length,
+ # and concatenating a tuple of more than 2G in length separately, so
+ # the smaller test still gets run even if there isn't memory for the
+ # larger test (but we still let the tester know the larger test is
+ # skipped, in verbose mode.)
+ def basic_concat_test(self, size):
+ t = ((),) * size
+ self.assertEquals(len(t), size)
+ t = t + t
+ self.assertEquals(len(t), size * 2)
+
+ @bigmemtest(minsize=_2G // 2 + 2, memuse=24)
+ def test_concat_small(self, size):
+ return self.basic_concat_test(size)
+
+ @bigmemtest(minsize=_2G + 2, memuse=24)
+ def test_concat_large(self, size):
+ return self.basic_concat_test(size)
+
+ @bigmemtest(minsize=_2G // 5 + 10, memuse=8 * 5)
+ def test_contains(self, size):
+ t = (1, 2, 3, 4, 5) * size
+ self.assertEquals(len(t), size * 5)
+ self.failUnless(5 in t)
+ self.failIf((1, 2, 3, 4, 5) in t)
+ self.failIf(0 in t)
+
+ @bigmemtest(minsize=_2G + 10, memuse=8)
+ def test_hash(self, size):
+ t1 = (0,) * size
+ h1 = hash(t1)
+ del t1
+ t2 = (0,) * (size + 1)
+ self.failIf(h1 == hash(t2))
+
+ @bigmemtest(minsize=_2G + 10, memuse=8)
+ def test_index_and_slice(self, size):
+ t = (None,) * size
+ self.assertEquals(len(t), size)
+ self.assertEquals(t[-1], None)
+ self.assertEquals(t[5], None)
+ self.assertEquals(t[size - 1], None)
+ self.assertRaises(IndexError, operator.getitem, t, size)
+ self.assertEquals(t[:5], (None,) * 5)
+ self.assertEquals(t[-5:], (None,) * 5)
+ self.assertEquals(t[20:25], (None,) * 5)
+ self.assertEquals(t[-25:-20], (None,) * 5)
+ self.assertEquals(t[size - 5:], (None,) * 5)
+ self.assertEquals(t[size - 5:size], (None,) * 5)
+ self.assertEquals(t[size - 6:size - 2], (None,) * 4)
+ self.assertEquals(t[size:size], ())
+ self.assertEquals(t[size:size+5], ())
+
+ # Like test_concat, split in two.
+ def basic_test_repeat(self, size):
+ t = ('',) * size
+ self.assertEquals(len(t), size)
+ t = t * 2
+ self.assertEquals(len(t), size * 2)
+
+ @bigmemtest(minsize=_2G // 2 + 2, memuse=24)
+ def test_repeat_small(self, size):
+ return self.basic_test_repeat(size)
+
+ @bigmemtest(minsize=_2G + 2, memuse=24)
+ def test_repeat_large(self, size):
+ return self.basic_test_repeat(size)
+
+ # Like test_concat, split in two.
+ def basic_test_repr(self, size):
+ t = (0,) * size
+ s = repr(t)
+ # The repr of a tuple of 0's is exactly three times the tuple length.
+ self.assertEquals(len(s), size * 3)
+ self.assertEquals(s[:5], '(0, 0')
+ self.assertEquals(s[-5:], '0, 0)')
+ self.assertEquals(s.count('0'), size)
+
+ @bigmemtest(minsize=_2G // 3 + 2, memuse=8 + 3)
+ def test_repr_small(self, size):
+ return self.basic_test_repr(size)
+
+ @bigmemtest(minsize=_2G + 2, memuse=8 + 3)
+ def test_repr_large(self, size):
+ return self.basic_test_repr(size)
+
+class ListTest(unittest.TestCase):
+
+ # Like tuples, lists have a small, fixed-sized head and an array of
+ # pointers to data, so 8 bytes per size. Also like tuples, we make the
+ # lists hold references to various objects to test their refcount
+ # limits.
+
+ @bigmemtest(minsize=_2G + 2, memuse=16)
+ def test_compare(self, size):
+ l1 = [u''] * size
+ l2 = [u''] * size
+ self.failUnless(l1 == l2)
+ del l2
+ l2 = [u''] * (size + 1)
+ self.failIf(l1 == l2)
+ del l2
+ l2 = [2] * size
+ self.failIf(l1 == l2)
+
+ # Test concatenating into a single list of more than 2G in length,
+ # and concatenating a list of more than 2G in length separately, so
+ # the smaller test still gets run even if there isn't memory for the
+ # larger test (but we still let the tester know the larger test is
+ # skipped, in verbose mode.)
+ def basic_test_concat(self, size):
+ l = [[]] * size
+ self.assertEquals(len(l), size)
+ l = l + l
+ self.assertEquals(len(l), size * 2)
+
+ @bigmemtest(minsize=_2G // 2 + 2, memuse=24)
+ def test_concat_small(self, size):
+ return self.basic_test_concat(size)
+
+ @bigmemtest(minsize=_2G + 2, memuse=24)
+ def test_concat_large(self, size):
+ return self.basic_test_concat(size)
+
+ def basic_test_inplace_concat(self, size):
+ l = [sys.stdout] * size
+ l += l
+ self.assertEquals(len(l), size * 2)
+ self.failUnless(l[0] is l[-1])
+ self.failUnless(l[size - 1] is l[size + 1])
+
+ @bigmemtest(minsize=_2G // 2 + 2, memuse=24)
+ def test_inplace_concat_small(self, size):
+ return self.basic_test_inplace_concat(size)
+
+ @bigmemtest(minsize=_2G + 2, memuse=24)
+ def test_inplace_concat_large(self, size):
+ return self.basic_test_inplace_concat(size)
+
+ @bigmemtest(minsize=_2G // 5 + 10, memuse=8 * 5)
+ def test_contains(self, size):
+ l = [1, 2, 3, 4, 5] * size
+ self.assertEquals(len(l), size * 5)
+ self.failUnless(5 in l)
+ self.failIf([1, 2, 3, 4, 5] in l)
+ self.failIf(0 in l)
+
+ @bigmemtest(minsize=_2G + 10, memuse=8)
+ def test_hash(self, size):
+ l = [0] * size
+ self.failUnlessRaises(TypeError, hash, l)
+
+ @bigmemtest(minsize=_2G + 10, memuse=8)
+ def test_index_and_slice(self, size):
+ l = [None] * size
+ self.assertEquals(len(l), size)
+ self.assertEquals(l[-1], None)
+ self.assertEquals(l[5], None)
+ self.assertEquals(l[size - 1], None)
+ self.assertRaises(IndexError, operator.getitem, l, size)
+ self.assertEquals(l[:5], [None] * 5)
+ self.assertEquals(l[-5:], [None] * 5)
+ self.assertEquals(l[20:25], [None] * 5)
+ self.assertEquals(l[-25:-20], [None] * 5)
+ self.assertEquals(l[size - 5:], [None] * 5)
+ self.assertEquals(l[size - 5:size], [None] * 5)
+ self.assertEquals(l[size - 6:size - 2], [None] * 4)
+ self.assertEquals(l[size:size], [])
+ self.assertEquals(l[size:size+5], [])
+
+ l[size - 2] = 5
+ self.assertEquals(len(l), size)
+ self.assertEquals(l[-3:], [None, 5, None])
+ self.assertEquals(l.count(5), 1)
+ self.assertRaises(IndexError, operator.setitem, l, size, 6)
+ self.assertEquals(len(l), size)
+
+ l[size - 7:] = [1, 2, 3, 4, 5]
+ size -= 2
+ self.assertEquals(len(l), size)
+ self.assertEquals(l[-7:], [None, None, 1, 2, 3, 4, 5])
+
+ l[:7] = [1, 2, 3, 4, 5]
+ size -= 2
+ self.assertEquals(len(l), size)
+ self.assertEquals(l[:7], [1, 2, 3, 4, 5, None, None])
+
+ del l[size - 1]
+ size -= 1
+ self.assertEquals(len(l), size)
+ self.assertEquals(l[-1], 4)
+
+ del l[-2:]
+ size -= 2
+ self.assertEquals(len(l), size)
+ self.assertEquals(l[-1], 2)
+
+ del l[0]
+ size -= 1
+ self.assertEquals(len(l), size)
+ self.assertEquals(l[0], 2)
+
+ del l[:2]
+ size -= 2
+ self.assertEquals(len(l), size)
+ self.assertEquals(l[0], 4)
+
+ # Like test_concat, split in two.
+ def basic_test_repeat(self, size):
+ l = [] * size
+ self.failIf(l)
+ l = [''] * size
+ self.assertEquals(len(l), size)
+ l = l * 2
+ self.assertEquals(len(l), size * 2)
+
+ @bigmemtest(minsize=_2G // 2 + 2, memuse=24)
+ def test_repeat_small(self, size):
+ return self.basic_test_repeat(size)
+
+ @bigmemtest(minsize=_2G + 2, memuse=24)
+ def test_repeat_large(self, size):
+ return self.basic_test_repeat(size)
+
+ def basic_test_inplace_repeat(self, size):
+ l = ['']
+ l *= size
+ self.assertEquals(len(l), size)
+ self.failUnless(l[0] is l[-1])
+ del l
+
+ l = [''] * size
+ l *= 2
+ self.assertEquals(len(l), size * 2)
+ self.failUnless(l[size - 1] is l[-1])
+
+ @bigmemtest(minsize=_2G // 2 + 2, memuse=16)
+ def test_inplace_repeat_small(self, size):
+ return self.basic_test_inplace_repeat(size)
+
+ @bigmemtest(minsize=_2G + 2, memuse=16)
+ def test_inplace_repeat_large(self, size):
+ return self.basic_test_inplace_repeat(size)
+
+ def basic_test_repr(self, size):
+ l = [0] * size
+ s = repr(l)
+ # The repr of a list of 0's is exactly three times the list length.
+ self.assertEquals(len(s), size * 3)
+ self.assertEquals(s[:5], '[0, 0')
+ self.assertEquals(s[-5:], '0, 0]')
+ self.assertEquals(s.count('0'), size)
+
+ @bigmemtest(minsize=_2G // 3 + 2, memuse=8 + 3)
+ def test_repr_small(self, size):
+ return self.basic_test_repr(size)
+
+ @bigmemtest(minsize=_2G + 2, memuse=8 + 3)
+ def test_repr_large(self, size):
+ return self.basic_test_repr(size)
+
+ # list overallocates ~1/8th of the total size (on first expansion) so
+ # the single list.append call puts memuse at 9 bytes per size.
+ @bigmemtest(minsize=_2G, memuse=9)
+ def test_append(self, size):
+ l = [object()] * size
+ l.append(object())
+ self.assertEquals(len(l), size+1)
+ self.failUnless(l[-3] is l[-2])
+ self.failIf(l[-2] is l[-1])
+
+ @bigmemtest(minsize=_2G // 5 + 2, memuse=8 * 5)
+ def test_count(self, size):
+ l = [1, 2, 3, 4, 5] * size
+ self.assertEquals(l.count(1), size)
+ self.assertEquals(l.count("1"), 0)
+
+ def basic_test_extend(self, size):
+ l = [file] * size
+ l.extend(l)
+ self.assertEquals(len(l), size * 2)
+ self.failUnless(l[0] is l[-1])
+ self.failUnless(l[size - 1] is l[size + 1])
+
+ @bigmemtest(minsize=_2G // 2 + 2, memuse=16)
+ def test_extend_small(self, size):
+ return self.basic_test_extend(size)
+
+ @bigmemtest(minsize=_2G + 2, memuse=16)
+ def test_extend_large(self, size):
+ return self.basic_test_extend(size)
+
+ @bigmemtest(minsize=_2G // 5 + 2, memuse=8 * 5)
+ def test_index(self, size):
+ l = [1L, 2L, 3L, 4L, 5L] * size
+ size *= 5
+ self.assertEquals(l.index(1), 0)
+ self.assertEquals(l.index(5, size - 5), size - 1)
+ self.assertEquals(l.index(5, size - 5, size), size - 1)
+ self.assertRaises(ValueError, l.index, 1, size - 4, size)
+ self.assertRaises(ValueError, l.index, 6L)
+
+ # This tests suffers from overallocation, just like test_append.
+ @bigmemtest(minsize=_2G + 10, memuse=9)
+ def test_insert(self, size):
+ l = [1.0] * size
+ l.insert(size - 1, "A")
+ size += 1
+ self.assertEquals(len(l), size)
+ self.assertEquals(l[-3:], [1.0, "A", 1.0])
+
+ l.insert(size + 1, "B")
+ size += 1
+ self.assertEquals(len(l), size)
+ self.assertEquals(l[-3:], ["A", 1.0, "B"])
+
+ l.insert(1, "C")
+ size += 1
+ self.assertEquals(len(l), size)
+ self.assertEquals(l[:3], [1.0, "C", 1.0])
+ self.assertEquals(l[size - 3:], ["A", 1.0, "B"])
+
+ @bigmemtest(minsize=_2G // 5 + 4, memuse=8 * 5)
+ def test_pop(self, size):
+ l = [u"a", u"b", u"c", u"d", u"e"] * size
+ size *= 5
+ self.assertEquals(len(l), size)
+
+ item = l.pop()
+ size -= 1
+ self.assertEquals(len(l), size)
+ self.assertEquals(item, u"e")
+ self.assertEquals(l[-2:], [u"c", u"d"])
+
+ item = l.pop(0)
+ size -= 1
+ self.assertEquals(len(l), size)
+ self.assertEquals(item, u"a")
+ self.assertEquals(l[:2], [u"b", u"c"])
+
+ item = l.pop(size - 2)
+ size -= 1
+ self.assertEquals(len(l), size)
+ self.assertEquals(item, u"c")
+ self.assertEquals(l[-2:], [u"b", u"d"])
+
+ @bigmemtest(minsize=_2G + 10, memuse=8)
+ def test_remove(self, size):
+ l = [10] * size
+ self.assertEquals(len(l), size)
+
+ l.remove(10)
+ size -= 1
+ self.assertEquals(len(l), size)
+
+ # Because of the earlier l.remove(), this append doesn't trigger
+ # a resize.
+ l.append(5)
+ size += 1
+ self.assertEquals(len(l), size)
+ self.assertEquals(l[-2:], [10, 5])
+ l.remove(5)
+ size -= 1
+ self.assertEquals(len(l), size)
+ self.assertEquals(l[-2:], [10, 10])
+
+ @bigmemtest(minsize=_2G // 5 + 2, memuse=8 * 5)
+ def test_reverse(self, size):
+ l = [1, 2, 3, 4, 5] * size
+ l.reverse()
+ self.assertEquals(len(l), size * 5)
+ self.assertEquals(l[-5:], [5, 4, 3, 2, 1])
+ self.assertEquals(l[:5], [5, 4, 3, 2, 1])
+
+ @bigmemtest(minsize=_2G // 5 + 2, memuse=8 * 5)
+ def test_sort(self, size):
+ l = [1, 2, 3, 4, 5] * size
+ l.sort()
+ self.assertEquals(len(l), size * 5)
+ self.assertEquals(l.count(1), size)
+ self.assertEquals(l[:10], [1] * 10)
+ self.assertEquals(l[-10:], [5] * 10)
+
+def test_main():
+ test_support.run_unittest(StrTest, TupleTest, ListTest)
+
+if __name__ == '__main__':
+ if len(sys.argv) > 1:
+ test_support.set_memlimit(sys.argv[1])
+ test_main()
diff --git a/Lib/test/test_builtin.py b/Lib/test/test_builtin.py
index ef4f407..71e2b0a 100644
--- a/Lib/test/test_builtin.py
+++ b/Lib/test/test_builtin.py
@@ -1,7 +1,8 @@
# Python test set -- built-in functions
import test.test_support, unittest
-from test.test_support import fcmp, have_unicode, TESTFN, unlink, run_unittest
+from test.test_support import fcmp, have_unicode, TESTFN, unlink, \
+ run_unittest, run_with_locale
from operator import neg
import sys, warnings, cStringIO, random, UserDict
@@ -528,33 +529,20 @@ class BuiltinTest(unittest.TestCase):
# Implementation limitation in PyFloat_FromString()
self.assertRaises(ValueError, float, unicode("1"*10000))
+ @run_with_locale('LC_NUMERIC', 'fr_FR', 'de_DE')
def test_float_with_comma(self):
# set locale to something that doesn't use '.' for the decimal point
- try:
- import locale
- orig_locale = locale.setlocale(locale.LC_NUMERIC)
- locale.setlocale(locale.LC_NUMERIC, 'fr_FR')
- except:
- # if we can't set the locale, just ignore this test
- return
-
- try:
- self.assertEqual(locale.localeconv()['decimal_point'], ',')
- except:
- # this test is worthless, just skip it and reset the locale
- locale.setlocale(locale.LC_NUMERIC, orig_locale)
+ import locale
+ if not locale.localeconv()['decimal_point'] == ',':
return
- try:
- self.assertEqual(float(" 3,14 "), 3.14)
- self.assertEqual(float(" +3,14 "), 3.14)
- self.assertEqual(float(" -3,14 "), -3.14)
- self.assertRaises(ValueError, float, " 0x3.1 ")
- self.assertRaises(ValueError, float, " -0x3.p-1 ")
- self.assertEqual(float(" 25.e-1 "), 2.5)
- self.assertEqual(fcmp(float(" .25e-1 "), .025), 0)
- finally:
- locale.setlocale(locale.LC_NUMERIC, orig_locale)
+ self.assertEqual(float(" 3,14 "), 3.14)
+ self.assertEqual(float(" +3,14 "), 3.14)
+ self.assertEqual(float(" -3,14 "), -3.14)
+ self.assertRaises(ValueError, float, " 0x3.1 ")
+ self.assertRaises(ValueError, float, " -0x3.p-1 ")
+ self.assertEqual(float(" 25.e-1 "), 2.5)
+ self.assertEqual(fcmp(float(" .25e-1 "), .025), 0)
def test_floatconversion(self):
# Make sure that calls to __float__() work properly
@@ -693,6 +681,84 @@ class BuiltinTest(unittest.TestCase):
self.assertEqual(int('0123', 0), 83)
self.assertEqual(int('0x123', 16), 291)
+ # SF bug 1334662: int(string, base) wrong answers
+ # Various representations of 2**32 evaluated to 0
+ # rather than 2**32 in previous versions
+
+ self.assertEqual(int('100000000000000000000000000000000', 2), 4294967296L)
+ self.assertEqual(int('102002022201221111211', 3), 4294967296L)
+ self.assertEqual(int('10000000000000000', 4), 4294967296L)
+ self.assertEqual(int('32244002423141', 5), 4294967296L)
+ self.assertEqual(int('1550104015504', 6), 4294967296L)
+ self.assertEqual(int('211301422354', 7), 4294967296L)
+ self.assertEqual(int('40000000000', 8), 4294967296L)
+ self.assertEqual(int('12068657454', 9), 4294967296L)
+ self.assertEqual(int('4294967296', 10), 4294967296L)
+ self.assertEqual(int('1904440554', 11), 4294967296L)
+ self.assertEqual(int('9ba461594', 12), 4294967296L)
+ self.assertEqual(int('535a79889', 13), 4294967296L)
+ self.assertEqual(int('2ca5b7464', 14), 4294967296L)
+ self.assertEqual(int('1a20dcd81', 15), 4294967296L)
+ self.assertEqual(int('100000000', 16), 4294967296L)
+ self.assertEqual(int('a7ffda91', 17), 4294967296L)
+ self.assertEqual(int('704he7g4', 18), 4294967296L)
+ self.assertEqual(int('4f5aff66', 19), 4294967296L)
+ self.assertEqual(int('3723ai4g', 20), 4294967296L)
+ self.assertEqual(int('281d55i4', 21), 4294967296L)
+ self.assertEqual(int('1fj8b184', 22), 4294967296L)
+ self.assertEqual(int('1606k7ic', 23), 4294967296L)
+ self.assertEqual(int('mb994ag', 24), 4294967296L)
+ self.assertEqual(int('hek2mgl', 25), 4294967296L)
+ self.assertEqual(int('dnchbnm', 26), 4294967296L)
+ self.assertEqual(int('b28jpdm', 27), 4294967296L)
+ self.assertEqual(int('8pfgih4', 28), 4294967296L)
+ self.assertEqual(int('76beigg', 29), 4294967296L)
+ self.assertEqual(int('5qmcpqg', 30), 4294967296L)
+ self.assertEqual(int('4q0jto4', 31), 4294967296L)
+ self.assertEqual(int('4000000', 32), 4294967296L)
+ self.assertEqual(int('3aokq94', 33), 4294967296L)
+ self.assertEqual(int('2qhxjli', 34), 4294967296L)
+ self.assertEqual(int('2br45qb', 35), 4294967296L)
+ self.assertEqual(int('1z141z4', 36), 4294967296L)
+
+ # SF bug 1334662: int(string, base) wrong answers
+ # Checks for proper evaluation of 2**32 + 1
+ self.assertEqual(int('100000000000000000000000000000001', 2), 4294967297L)
+ self.assertEqual(int('102002022201221111212', 3), 4294967297L)
+ self.assertEqual(int('10000000000000001', 4), 4294967297L)
+ self.assertEqual(int('32244002423142', 5), 4294967297L)
+ self.assertEqual(int('1550104015505', 6), 4294967297L)
+ self.assertEqual(int('211301422355', 7), 4294967297L)
+ self.assertEqual(int('40000000001', 8), 4294967297L)
+ self.assertEqual(int('12068657455', 9), 4294967297L)
+ self.assertEqual(int('4294967297', 10), 4294967297L)
+ self.assertEqual(int('1904440555', 11), 4294967297L)
+ self.assertEqual(int('9ba461595', 12), 4294967297L)
+ self.assertEqual(int('535a7988a', 13), 4294967297L)
+ self.assertEqual(int('2ca5b7465', 14), 4294967297L)
+ self.assertEqual(int('1a20dcd82', 15), 4294967297L)
+ self.assertEqual(int('100000001', 16), 4294967297L)
+ self.assertEqual(int('a7ffda92', 17), 4294967297L)
+ self.assertEqual(int('704he7g5', 18), 4294967297L)
+ self.assertEqual(int('4f5aff67', 19), 4294967297L)
+ self.assertEqual(int('3723ai4h', 20), 4294967297L)
+ self.assertEqual(int('281d55i5', 21), 4294967297L)
+ self.assertEqual(int('1fj8b185', 22), 4294967297L)
+ self.assertEqual(int('1606k7id', 23), 4294967297L)
+ self.assertEqual(int('mb994ah', 24), 4294967297L)
+ self.assertEqual(int('hek2mgm', 25), 4294967297L)
+ self.assertEqual(int('dnchbnn', 26), 4294967297L)
+ self.assertEqual(int('b28jpdn', 27), 4294967297L)
+ self.assertEqual(int('8pfgih5', 28), 4294967297L)
+ self.assertEqual(int('76beigh', 29), 4294967297L)
+ self.assertEqual(int('5qmcpqh', 30), 4294967297L)
+ self.assertEqual(int('4q0jto5', 31), 4294967297L)
+ self.assertEqual(int('4000001', 32), 4294967297L)
+ self.assertEqual(int('3aokq95', 33), 4294967297L)
+ self.assertEqual(int('2qhxjlj', 34), 4294967297L)
+ self.assertEqual(int('2br45qc', 35), 4294967297L)
+ self.assertEqual(int('1z141z5', 36), 4294967297L)
+
def test_intconversion(self):
# Test __int__()
class Foo0:
@@ -886,6 +952,81 @@ class BuiltinTest(unittest.TestCase):
self.assertRaises(ValueError, long, '53', 40)
self.assertRaises(TypeError, long, 1, 12)
+ self.assertEqual(long('100000000000000000000000000000000', 2),
+ 4294967296)
+ self.assertEqual(long('102002022201221111211', 3), 4294967296)
+ self.assertEqual(long('10000000000000000', 4), 4294967296)
+ self.assertEqual(long('32244002423141', 5), 4294967296)
+ self.assertEqual(long('1550104015504', 6), 4294967296)
+ self.assertEqual(long('211301422354', 7), 4294967296)
+ self.assertEqual(long('40000000000', 8), 4294967296)
+ self.assertEqual(long('12068657454', 9), 4294967296)
+ self.assertEqual(long('4294967296', 10), 4294967296)
+ self.assertEqual(long('1904440554', 11), 4294967296)
+ self.assertEqual(long('9ba461594', 12), 4294967296)
+ self.assertEqual(long('535a79889', 13), 4294967296)
+ self.assertEqual(long('2ca5b7464', 14), 4294967296)
+ self.assertEqual(long('1a20dcd81', 15), 4294967296)
+ self.assertEqual(long('100000000', 16), 4294967296)
+ self.assertEqual(long('a7ffda91', 17), 4294967296)
+ self.assertEqual(long('704he7g4', 18), 4294967296)
+ self.assertEqual(long('4f5aff66', 19), 4294967296)
+ self.assertEqual(long('3723ai4g', 20), 4294967296)
+ self.assertEqual(long('281d55i4', 21), 4294967296)
+ self.assertEqual(long('1fj8b184', 22), 4294967296)
+ self.assertEqual(long('1606k7ic', 23), 4294967296)
+ self.assertEqual(long('mb994ag', 24), 4294967296)
+ self.assertEqual(long('hek2mgl', 25), 4294967296)
+ self.assertEqual(long('dnchbnm', 26), 4294967296)
+ self.assertEqual(long('b28jpdm', 27), 4294967296)
+ self.assertEqual(long('8pfgih4', 28), 4294967296)
+ self.assertEqual(long('76beigg', 29), 4294967296)
+ self.assertEqual(long('5qmcpqg', 30), 4294967296)
+ self.assertEqual(long('4q0jto4', 31), 4294967296)
+ self.assertEqual(long('4000000', 32), 4294967296)
+ self.assertEqual(long('3aokq94', 33), 4294967296)
+ self.assertEqual(long('2qhxjli', 34), 4294967296)
+ self.assertEqual(long('2br45qb', 35), 4294967296)
+ self.assertEqual(long('1z141z4', 36), 4294967296)
+
+ self.assertEqual(long('100000000000000000000000000000001', 2),
+ 4294967297)
+ self.assertEqual(long('102002022201221111212', 3), 4294967297)
+ self.assertEqual(long('10000000000000001', 4), 4294967297)
+ self.assertEqual(long('32244002423142', 5), 4294967297)
+ self.assertEqual(long('1550104015505', 6), 4294967297)
+ self.assertEqual(long('211301422355', 7), 4294967297)
+ self.assertEqual(long('40000000001', 8), 4294967297)
+ self.assertEqual(long('12068657455', 9), 4294967297)
+ self.assertEqual(long('4294967297', 10), 4294967297)
+ self.assertEqual(long('1904440555', 11), 4294967297)
+ self.assertEqual(long('9ba461595', 12), 4294967297)
+ self.assertEqual(long('535a7988a', 13), 4294967297)
+ self.assertEqual(long('2ca5b7465', 14), 4294967297)
+ self.assertEqual(long('1a20dcd82', 15), 4294967297)
+ self.assertEqual(long('100000001', 16), 4294967297)
+ self.assertEqual(long('a7ffda92', 17), 4294967297)
+ self.assertEqual(long('704he7g5', 18), 4294967297)
+ self.assertEqual(long('4f5aff67', 19), 4294967297)
+ self.assertEqual(long('3723ai4h', 20), 4294967297)
+ self.assertEqual(long('281d55i5', 21), 4294967297)
+ self.assertEqual(long('1fj8b185', 22), 4294967297)
+ self.assertEqual(long('1606k7id', 23), 4294967297)
+ self.assertEqual(long('mb994ah', 24), 4294967297)
+ self.assertEqual(long('hek2mgm', 25), 4294967297)
+ self.assertEqual(long('dnchbnn', 26), 4294967297)
+ self.assertEqual(long('b28jpdn', 27), 4294967297)
+ self.assertEqual(long('8pfgih5', 28), 4294967297)
+ self.assertEqual(long('76beigh', 29), 4294967297)
+ self.assertEqual(long('5qmcpqh', 30), 4294967297)
+ self.assertEqual(long('4q0jto5', 31), 4294967297)
+ self.assertEqual(long('4000001', 32), 4294967297)
+ self.assertEqual(long('3aokq95', 33), 4294967297)
+ self.assertEqual(long('2qhxjlj', 34), 4294967297)
+ self.assertEqual(long('2br45qc', 35), 4294967297)
+ self.assertEqual(long('1z141z5', 36), 4294967297)
+
+
def test_longconversion(self):
# Test __long__()
class Foo0:
diff --git a/Lib/test/test_cmd_line.py b/Lib/test/test_cmd_line.py
index 018bec6..ec860d1 100644
--- a/Lib/test/test_cmd_line.py
+++ b/Lib/test/test_cmd_line.py
@@ -15,8 +15,11 @@ class CmdLineTest(unittest.TestCase):
popen2._cleanup()
return data
- def exit_code(self, cmd_line):
- return subprocess.call([sys.executable, cmd_line], stderr=subprocess.PIPE)
+ def exit_code(self, *args):
+ cmd_line = [sys.executable]
+ cmd_line.extend(args)
+ return subprocess.call(cmd_line, stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
def test_directories(self):
self.assertNotEqual(self.exit_code('.'), 0)
@@ -50,6 +53,38 @@ class CmdLineTest(unittest.TestCase):
version = 'Python %d.%d' % sys.version_info[:2]
self.assertTrue(self.start_python('-V').startswith(version))
+ def test_run_module(self):
+ # Test expected operation of the '-m' switch
+ # Switch needs an argument
+ self.assertNotEqual(self.exit_code('-m'), 0)
+ # Check we get an error for a nonexistent module
+ self.assertNotEqual(
+ self.exit_code('-m', 'fnord43520xyz'),
+ 0)
+ # Check the runpy module also gives an error for
+ # a nonexistent module
+ self.assertNotEqual(
+ self.exit_code('-m', 'runpy', 'fnord43520xyz'),
+ 0)
+ # All good if module is located and run successfully
+ self.assertEqual(
+ self.exit_code('-m', 'timeit', '-n', '1'),
+ 0)
+
+ def test_run_code(self):
+ # Test expected operation of the '-c' switch
+ # Switch needs an argument
+ self.assertNotEqual(self.exit_code('-c'), 0)
+ # Check we get an error for an uncaught exception
+ self.assertNotEqual(
+ self.exit_code('-c', 'raise Exception'),
+ 0)
+ # All good if execution is successful
+ self.assertEqual(
+ self.exit_code('-c', 'pass'),
+ 0)
+
+
def test_main():
test.test_support.run_unittest(CmdLineTest)
diff --git a/Lib/test/test_codeccallbacks.py b/Lib/test/test_codeccallbacks.py
index c6e56c9..159c86d 100644
--- a/Lib/test/test_codeccallbacks.py
+++ b/Lib/test/test_codeccallbacks.py
@@ -18,30 +18,12 @@ class PosReturn:
self.pos = len(exc.object)
return (u"<?>", oldpos)
-# A UnicodeEncodeError object without a start attribute
-class NoStartUnicodeEncodeError(UnicodeEncodeError):
- def __init__(self):
- UnicodeEncodeError.__init__(self, "ascii", u"", 0, 1, "bad")
- del self.start
-
# A UnicodeEncodeError object with a bad start attribute
class BadStartUnicodeEncodeError(UnicodeEncodeError):
def __init__(self):
UnicodeEncodeError.__init__(self, "ascii", u"", 0, 1, "bad")
self.start = []
-# A UnicodeEncodeError object without an end attribute
-class NoEndUnicodeEncodeError(UnicodeEncodeError):
- def __init__(self):
- UnicodeEncodeError.__init__(self, "ascii", u"", 0, 1, "bad")
- del self.end
-
-# A UnicodeEncodeError object without an object attribute
-class NoObjectUnicodeEncodeError(UnicodeEncodeError):
- def __init__(self):
- UnicodeEncodeError.__init__(self, "ascii", u"", 0, 1, "bad")
- del self.object
-
# A UnicodeEncodeError object with a bad object attribute
class BadObjectUnicodeEncodeError(UnicodeEncodeError):
def __init__(self):
@@ -478,55 +460,15 @@ class CodecCallbackTest(unittest.TestCase):
UnicodeError("ouch")
)
self.assertRaises(
- AttributeError,
- codecs.replace_errors,
- NoStartUnicodeEncodeError()
- )
- self.assertRaises(
- TypeError,
- codecs.replace_errors,
- BadStartUnicodeEncodeError()
- )
- self.assertRaises(
- AttributeError,
- codecs.replace_errors,
- NoEndUnicodeEncodeError()
- )
- self.assertRaises(
- AttributeError,
- codecs.replace_errors,
- NoObjectUnicodeEncodeError()
- )
- self.assertRaises(
TypeError,
codecs.replace_errors,
BadObjectUnicodeEncodeError()
)
self.assertRaises(
- AttributeError,
- codecs.replace_errors,
- NoEndUnicodeDecodeError()
- )
- self.assertRaises(
TypeError,
codecs.replace_errors,
BadObjectUnicodeDecodeError()
)
- self.assertRaises(
- AttributeError,
- codecs.replace_errors,
- NoStartUnicodeTranslateError()
- )
- self.assertRaises(
- AttributeError,
- codecs.replace_errors,
- NoEndUnicodeTranslateError()
- )
- self.assertRaises(
- AttributeError,
- codecs.replace_errors,
- NoObjectUnicodeTranslateError()
- )
# With the correct exception, "replace" returns an "?" or u"\ufffd" replacement
self.assertEquals(
codecs.replace_errors(UnicodeEncodeError("ascii", u"\u3042", 0, 1, "ouch")),
@@ -565,21 +507,6 @@ class CodecCallbackTest(unittest.TestCase):
codecs.xmlcharrefreplace_errors,
UnicodeTranslateError(u"\u3042", 0, 1, "ouch")
)
- self.assertRaises(
- AttributeError,
- codecs.xmlcharrefreplace_errors,
- NoStartUnicodeEncodeError()
- )
- self.assertRaises(
- AttributeError,
- codecs.xmlcharrefreplace_errors,
- NoEndUnicodeEncodeError()
- )
- self.assertRaises(
- AttributeError,
- codecs.xmlcharrefreplace_errors,
- NoObjectUnicodeEncodeError()
- )
# Use the correct exception
cs = (0, 1, 9, 10, 99, 100, 999, 1000, 9999, 10000, 0x3042)
s = "".join(unichr(c) for c in cs)
diff --git a/Lib/test/test_codecencodings_cn.py b/Lib/test/test_codecencodings_cn.py
index 0638f4f..1bf8583 100644
--- a/Lib/test/test_codecencodings_cn.py
+++ b/Lib/test/test_codecencodings_cn.py
@@ -3,7 +3,6 @@
# test_codecencodings_cn.py
# Codec encoding tests for PRC encodings.
#
-# $CJKCodecs: test_codecencodings_cn.py,v 1.2 2004/06/19 06:09:55 perky Exp $
from test import test_support
from test import test_multibytecodec_support
diff --git a/Lib/test/test_codecencodings_hk.py b/Lib/test/test_codecencodings_hk.py
index e7fad90..1cd020f 100644
--- a/Lib/test/test_codecencodings_hk.py
+++ b/Lib/test/test_codecencodings_hk.py
@@ -3,7 +3,6 @@
# test_codecencodings_hk.py
# Codec encoding tests for HongKong encodings.
#
-# $CJKCodecs: test_codecencodings_hk.py,v 1.1 2004/07/10 17:35:20 perky Exp $
from test import test_support
from test import test_multibytecodec_support
diff --git a/Lib/test/test_codecencodings_jp.py b/Lib/test/test_codecencodings_jp.py
index 483b7db..558598a 100644
--- a/Lib/test/test_codecencodings_jp.py
+++ b/Lib/test/test_codecencodings_jp.py
@@ -3,7 +3,6 @@
# test_codecencodings_jp.py
# Codec encoding tests for Japanese encodings.
#
-# $CJKCodecs: test_codecencodings_jp.py,v 1.3 2004/06/19 06:09:55 perky Exp $
from test import test_support
from test import test_multibytecodec_support
diff --git a/Lib/test/test_codecencodings_kr.py b/Lib/test/test_codecencodings_kr.py
index 489c9f1..8139f76 100644
--- a/Lib/test/test_codecencodings_kr.py
+++ b/Lib/test/test_codecencodings_kr.py
@@ -3,7 +3,6 @@
# test_codecencodings_kr.py
# Codec encoding tests for ROK encodings.
#
-# $CJKCodecs: test_codecencodings_kr.py,v 1.2 2004/06/19 06:09:55 perky Exp $
from test import test_support
from test import test_multibytecodec_support
diff --git a/Lib/test/test_codecencodings_tw.py b/Lib/test/test_codecencodings_tw.py
index fb8a4d0..7c59478 100644
--- a/Lib/test/test_codecencodings_tw.py
+++ b/Lib/test/test_codecencodings_tw.py
@@ -3,7 +3,6 @@
# test_codecencodings_tw.py
# Codec encoding tests for ROC encodings.
#
-# $CJKCodecs: test_codecencodings_tw.py,v 1.2 2004/06/19 06:09:55 perky Exp $
from test import test_support
from test import test_multibytecodec_support
diff --git a/Lib/test/test_codecmaps_cn.py b/Lib/test/test_codecmaps_cn.py
index 25ecc02..8cbee76 100644
--- a/Lib/test/test_codecmaps_cn.py
+++ b/Lib/test/test_codecmaps_cn.py
@@ -3,7 +3,6 @@
# test_codecmaps_cn.py
# Codec mapping tests for PRC encodings
#
-# $CJKCodecs: test_codecmaps_cn.py,v 1.3 2004/06/19 06:09:55 perky Exp $
from test import test_support
from test import test_multibytecodec_support
diff --git a/Lib/test/test_codecmaps_hk.py b/Lib/test/test_codecmaps_hk.py
index 2335c51..e7f7b96 100644
--- a/Lib/test/test_codecmaps_hk.py
+++ b/Lib/test/test_codecmaps_hk.py
@@ -3,7 +3,6 @@
# test_codecmaps_hk.py
# Codec mapping tests for HongKong encodings
#
-# $CJKCodecs: test_codecmaps_hk.py,v 1.1 2004/07/10 17:35:20 perky Exp $
from test import test_support
from test import test_multibytecodec_support
diff --git a/Lib/test/test_codecmaps_jp.py b/Lib/test/test_codecmaps_jp.py
index e75a5a8..08052d4 100644
--- a/Lib/test/test_codecmaps_jp.py
+++ b/Lib/test/test_codecmaps_jp.py
@@ -3,7 +3,6 @@
# test_codecmaps_jp.py
# Codec mapping tests for Japanese encodings
#
-# $CJKCodecs: test_codecmaps_jp.py,v 1.3 2004/06/19 06:09:55 perky Exp $
from test import test_support
from test import test_multibytecodec_support
diff --git a/Lib/test/test_codecmaps_kr.py b/Lib/test/test_codecmaps_kr.py
index db65c01..7484a66 100644
--- a/Lib/test/test_codecmaps_kr.py
+++ b/Lib/test/test_codecmaps_kr.py
@@ -3,7 +3,6 @@
# test_codecmaps_kr.py
# Codec mapping tests for ROK encodings
#
-# $CJKCodecs: test_codecmaps_kr.py,v 1.3 2004/06/19 06:09:55 perky Exp $
from test import test_support
from test import test_multibytecodec_support
diff --git a/Lib/test/test_codecmaps_tw.py b/Lib/test/test_codecmaps_tw.py
index 2d469b0..0b195f4 100644
--- a/Lib/test/test_codecmaps_tw.py
+++ b/Lib/test/test_codecmaps_tw.py
@@ -3,7 +3,6 @@
# test_codecmaps_tw.py
# Codec mapping tests for ROC encodings
#
-# $CJKCodecs: test_codecmaps_tw.py,v 1.3 2004/06/19 06:09:55 perky Exp $
from test import test_support
from test import test_multibytecodec_support
diff --git a/Lib/test/test_compiler.py b/Lib/test/test_compiler.py
index a59d6aa..48f1643 100644
--- a/Lib/test/test_compiler.py
+++ b/Lib/test/test_compiler.py
@@ -26,6 +26,7 @@ class CompilerTest(unittest.TestCase):
next_time = time.time() + _PRINT_WORKING_MSG_INTERVAL
print >>sys.__stdout__, \
' testCompileLibrary still working, be patient...'
+ sys.__stdout__.flush()
if not basename.endswith(".py"):
continue
@@ -55,6 +56,9 @@ class CompilerTest(unittest.TestCase):
def testYieldExpr(self):
compiler.compile("def g(): yield\n\n", "<string>", "exec")
+ def testDefaultArgs(self):
+ self.assertRaises(SyntaxError, compiler.parse, "def foo(a=1, b): pass")
+
def testLineNo(self):
# Test that all nodes except Module have a correct lineno attribute.
filename = __file__
diff --git a/Lib/test/test_contextlib.py b/Lib/test/test_contextlib.py
index 97470c7..2cf39ae 100644
--- a/Lib/test/test_contextlib.py
+++ b/Lib/test/test_contextlib.py
@@ -51,7 +51,7 @@ class ContextManagerTestCase(unittest.TestCase):
@contextmanager
def whee():
yield
- ctx = whee().__context__()
+ ctx = whee()
ctx.__enter__()
# Calling __exit__ should not result in an exception
self.failIf(ctx.__exit__(TypeError, TypeError("foo"), None))
@@ -63,7 +63,7 @@ class ContextManagerTestCase(unittest.TestCase):
yield
except:
yield
- ctx = whoo().__context__()
+ ctx = whoo()
ctx.__enter__()
self.assertRaises(
RuntimeError, ctx.__exit__, TypeError, TypeError("foo"), None
@@ -146,6 +146,29 @@ class NestedTestCase(unittest.TestCase):
else:
self.fail("Didn't raise ZeroDivisionError")
+ def test_nested_right_exception(self):
+ state = []
+ @contextmanager
+ def a():
+ yield 1
+ class b(object):
+ def __enter__(self):
+ return 2
+ def __exit__(self, *exc_info):
+ try:
+ raise Exception()
+ except:
+ pass
+ try:
+ with nested(a(), b()) as (x, y):
+ 1/0
+ except ZeroDivisionError:
+ self.assertEqual((x, y), (1, 2))
+ except Exception:
+ self.fail("Reraised wrong exception")
+ else:
+ self.fail("Didn't raise ZeroDivisionError")
+
def test_nested_b_swallows(self):
@contextmanager
def a():
@@ -316,12 +339,12 @@ class DecimalContextTestCase(unittest.TestCase):
orig_context = ctx.copy()
try:
ctx.prec = save_prec = decimal.ExtendedContext.prec + 5
- with decimal.ExtendedContext:
+ with decimal.ExtendedContext.get_manager():
self.assertEqual(decimal.getcontext().prec,
decimal.ExtendedContext.prec)
self.assertEqual(decimal.getcontext().prec, save_prec)
try:
- with decimal.ExtendedContext:
+ with decimal.ExtendedContext.get_manager():
self.assertEqual(decimal.getcontext().prec,
decimal.ExtendedContext.prec)
1/0
diff --git a/Lib/test/test_cookielib.py b/Lib/test/test_cookielib.py
index 49e7d47..991506c 100644
--- a/Lib/test/test_cookielib.py
+++ b/Lib/test/test_cookielib.py
@@ -695,6 +695,22 @@ class CookieTests(TestCase):
'foo=bar; domain=friendly.org; Version="1"')
self.assertEquals(len(c), 0)
+ def test_strict_domain(self):
+ # Cookies whose domain is a country-code tld like .co.uk should
+ # not be set if CookiePolicy.strict_domain is true.
+ from cookielib import CookieJar, DefaultCookiePolicy
+
+ cp = DefaultCookiePolicy(strict_domain=True)
+ cj = CookieJar(policy=cp)
+ interact_netscape(cj, "http://example.co.uk/", 'no=problemo')
+ interact_netscape(cj, "http://example.co.uk/",
+ 'okey=dokey; Domain=.example.co.uk')
+ self.assertEquals(len(cj), 2)
+ for pseudo_tld in [".co.uk", ".org.za", ".tx.us", ".name.us"]:
+ interact_netscape(cj, "http://example.%s/" % pseudo_tld,
+ 'spam=eggs; Domain=.co.uk')
+ self.assertEquals(len(cj), 2)
+
def test_two_component_domain_ns(self):
# Netscape: .www.bar.com, www.bar.com, .bar.com, bar.com, no domain
# should all get accepted, as should .acme.com, acme.com and no domain
diff --git a/Lib/test/test_datetime.py b/Lib/test/test_datetime.py
index 2528b4a..203bea1 100644
--- a/Lib/test/test_datetime.py
+++ b/Lib/test/test_datetime.py
@@ -1400,6 +1400,12 @@ class TestDateTime(TestDate):
got = self.theclass.utcfromtimestamp(ts)
self.verify_field_equality(expected, got)
+ def test_microsecond_rounding(self):
+ # Test whether fromtimestamp "rounds up" floats that are less
+ # than one microsecond smaller than an integer.
+ self.assertEquals(self.theclass.fromtimestamp(0.9999999),
+ self.theclass.fromtimestamp(1))
+
def test_insane_fromtimestamp(self):
# It's possible that some platform maps time_t to double,
# and that this test will fail there. This test should
diff --git a/Lib/test/test_doctest.py b/Lib/test/test_doctest.py
index b17607d..443c962 100644
--- a/Lib/test/test_doctest.py
+++ b/Lib/test/test_doctest.py
@@ -1079,6 +1079,25 @@ output to match any substring in the actual output:
... # doctest: +NORMALIZE_WHITESPACE
[0, 1, ..., 18, 19]
+The SKIP flag causes an example to be skipped entirely. I.e., the
+example is not run. It can be useful in contexts where doctest
+examples serve as both documentation and test cases, and an example
+should be included for documentation purposes, but should not be
+checked (e.g., because its output is random, or depends on resources
+which would be unavailable.) The SKIP flag can also be used for
+'commenting out' broken examples.
+
+ >>> import unavailable_resource # doctest: +SKIP
+ >>> unavailable_resource.do_something() # doctest: +SKIP
+ >>> unavailable_resource.blow_up() # doctest: +SKIP
+ Traceback (most recent call last):
+ ...
+ UncheckedBlowUpError: Nobody checks me.
+
+ >>> import random
+ >>> print random.random() # doctest: +SKIP
+ 0.721216923889
+
The REPORT_UDIFF flag causes failures that involve multi-line expected
and actual outputs to be displayed using a unified diff:
@@ -1281,6 +1300,26 @@ count as failures:
ValueError: 2
(3, 5)
+New option flags can also be registered, via register_optionflag(). Here
+we reach into doctest's internals a bit.
+
+ >>> unlikely = "UNLIKELY_OPTION_NAME"
+ >>> unlikely in doctest.OPTIONFLAGS_BY_NAME
+ False
+ >>> new_flag_value = doctest.register_optionflag(unlikely)
+ >>> unlikely in doctest.OPTIONFLAGS_BY_NAME
+ True
+
+Before 2.4.4/2.5, registering a name more than once erroneously created
+more than one flag value. Here we verify that's fixed:
+
+ >>> redundant_flag_value = doctest.register_optionflag(unlikely)
+ >>> redundant_flag_value == new_flag_value
+ True
+
+Clean up.
+ >>> del doctest.OPTIONFLAGS_BY_NAME[unlikely]
+
"""
def option_directives(): r"""
diff --git a/Lib/test/test_exceptions.py b/Lib/test/test_exceptions.py
index 7946142..8f995f7 100644
--- a/Lib/test/test_exceptions.py
+++ b/Lib/test/test_exceptions.py
@@ -81,14 +81,6 @@ try: x = undefined_variable
except NameError: pass
r(OverflowError)
-# XXX
-# Obscure: in 2.2 and 2.3, this test relied on changing OverflowWarning
-# into an error, in order to trigger OverflowError. In 2.4, OverflowWarning
-# should no longer be generated, so the focus of the test shifts to showing
-# that OverflowError *isn't* generated. OverflowWarning should be gone
-# in Python 2.5, and then the filterwarnings() call, and this comment,
-# should go away.
-warnings.filterwarnings("error", "", OverflowWarning, __name__)
x = 1
for dummy in range(128):
x += x # this simply shouldn't blow up
@@ -224,3 +216,88 @@ if not sys.platform.startswith('java'):
test_capi3()
unlink(TESTFN)
+
+# test that exception attributes are happy.
+try: str(u'Hello \u00E1')
+except Exception, e: sampleUnicodeEncodeError = e
+try: unicode('\xff')
+except Exception, e: sampleUnicodeDecodeError = e
+exceptionList = [
+ ( BaseException, (), { 'message' : '', 'args' : () }),
+ ( BaseException, (1, ), { 'message' : 1, 'args' : ( 1, ) }),
+ ( BaseException, ('foo', ), { 'message' : 'foo', 'args' : ( 'foo', ) }),
+ ( BaseException, ('foo', 1), { 'message' : '', 'args' : ( 'foo', 1 ) }),
+ ( SystemExit, ('foo',), { 'message' : 'foo', 'args' : ( 'foo', ),
+ 'code' : 'foo' }),
+ ( IOError, ('foo',), { 'message' : 'foo', 'args' : ( 'foo', ), }),
+ ( IOError, ('foo', 'bar'), { 'message' : '',
+ 'args' : ('foo', 'bar'), }),
+ ( IOError, ('foo', 'bar', 'baz'),
+ { 'message' : '', 'args' : ('foo', 'bar'), }),
+ ( EnvironmentError, ('errnoStr', 'strErrorStr', 'filenameStr'),
+ { 'message' : '', 'args' : ('errnoStr', 'strErrorStr'),
+ 'strerror' : 'strErrorStr',
+ 'errno' : 'errnoStr', 'filename' : 'filenameStr' }),
+ ( EnvironmentError, (1, 'strErrorStr', 'filenameStr'),
+ { 'message' : '', 'args' : (1, 'strErrorStr'),
+ 'strerror' : 'strErrorStr', 'errno' : 1,
+ 'filename' : 'filenameStr' }),
+ ( SyntaxError, ('msgStr',),
+ { 'message' : 'msgStr', 'args' : ('msgStr', ),
+ 'print_file_and_line' : None, 'msg' : 'msgStr',
+ 'filename' : None, 'lineno' : None, 'offset' : None,
+ 'text' : None }),
+ ( SyntaxError, ('msgStr', ('filenameStr', 'linenoStr', 'offsetStr',
+ 'textStr')),
+ { 'message' : '', 'args' : ('msgStr', ('filenameStr',
+ 'linenoStr', 'offsetStr', 'textStr' )),
+ 'print_file_and_line' : None, 'msg' : 'msgStr',
+ 'filename' : 'filenameStr', 'lineno' : 'linenoStr',
+ 'offset' : 'offsetStr', 'text' : 'textStr' }),
+ ( SyntaxError, ('msgStr', 'filenameStr', 'linenoStr', 'offsetStr',
+ 'textStr', 'print_file_and_lineStr'),
+ { 'message' : '', 'args' : ('msgStr', 'filenameStr',
+ 'linenoStr', 'offsetStr', 'textStr',
+ 'print_file_and_lineStr'),
+ 'print_file_and_line' : None, 'msg' : 'msgStr',
+ 'filename' : None, 'lineno' : None, 'offset' : None,
+ 'text' : None }),
+ ( UnicodeError, (),
+ { 'message' : '', 'args' : (), }),
+ ( sampleUnicodeEncodeError,
+ { 'message' : '', 'args' : ('ascii', u'Hello \xe1', 6, 7,
+ 'ordinal not in range(128)'),
+ 'encoding' : 'ascii', 'object' : u'Hello \xe1',
+ 'start' : 6, 'reason' : 'ordinal not in range(128)' }),
+ ( sampleUnicodeDecodeError,
+ { 'message' : '', 'args' : ('ascii', '\xff', 0, 1,
+ 'ordinal not in range(128)'),
+ 'encoding' : 'ascii', 'object' : '\xff',
+ 'start' : 0, 'reason' : 'ordinal not in range(128)' }),
+ ( UnicodeTranslateError, (u"\u3042", 0, 1, "ouch"),
+ { 'message' : '', 'args' : (u'\u3042', 0, 1, 'ouch'),
+ 'object' : u'\u3042', 'reason' : 'ouch',
+ 'start' : 0, 'end' : 1 }),
+ ]
+try:
+ exceptionList.append(
+ ( WindowsError, (1, 'strErrorStr', 'filenameStr'),
+ { 'message' : '', 'args' : (1, 'strErrorStr'),
+ 'strerror' : 'strErrorStr',
+ 'errno' : 22, 'filename' : 'filenameStr',
+ 'winerror' : 1 }))
+except NameError: pass
+
+for args in exceptionList:
+ expected = args[-1]
+ try:
+ if len(args) == 2: raise args[0]
+ else: raise apply(args[0], args[1])
+ except BaseException, e:
+ for checkArgName in expected.keys():
+ if repr(getattr(e, checkArgName)) != repr(expected[checkArgName]):
+ raise TestFailed('Checking exception arguments, exception '
+ '"%s", attribute "%s" expected %s got %s.' %
+ ( repr(e), checkArgName,
+ repr(expected[checkArgName]),
+ repr(getattr(e, checkArgName)) ))
diff --git a/Lib/test/test_file.py b/Lib/test/test_file.py
index cfc1019..ca1c6ba 100644
--- a/Lib/test/test_file.py
+++ b/Lib/test/test_file.py
@@ -147,7 +147,7 @@ f.close()
bad_mode = "qwerty"
try:
open(TESTFN, bad_mode)
-except IOError, msg:
+except ValueError, msg:
if msg[0] != 0:
s = str(msg)
if s.find(TESTFN) != -1 or s.find(bad_mode) == -1:
diff --git a/Lib/test/test_grp.py b/Lib/test/test_grp.py
index 2c3ab29..08958ba 100755
--- a/Lib/test/test_grp.py
+++ b/Lib/test/test_grp.py
@@ -31,7 +31,10 @@ class GroupDatabaseTestCase(unittest.TestCase):
self.assertEqual(e2.gr_gid, e.gr_gid)
e2 = grp.getgrnam(e.gr_name)
self.check_value(e2)
- self.assertEqual(e2.gr_name, e.gr_name)
+ # There are instances where getgrall() returns group names in
+ # lowercase while getgrgid() returns proper casing.
+ # Discovered on Ubuntu 5.04 (custom).
+ self.assertEqual(e2.gr_name.lower(), e.gr_name.lower())
def test_errors(self):
self.assertRaises(TypeError, grp.getgrgid)
diff --git a/Lib/test/test_import.py b/Lib/test/test_import.py
index a72b8bd..effba3c 100644
--- a/Lib/test/test_import.py
+++ b/Lib/test/test_import.py
@@ -205,3 +205,20 @@ def test_import_name_binding():
assert y is test.test_support, y.__name__
test_import_name_binding()
+
+def test_import_initless_directory_warning():
+ import warnings
+ oldfilters = warnings.filters[:]
+ warnings.simplefilter('error', ImportWarning);
+ try:
+ # Just a random non-package directory we always expect to be
+ # somewhere in sys.path...
+ __import__("site-packages")
+ except ImportWarning:
+ pass
+ else:
+ raise AssertionError
+ finally:
+ warnings.filters = oldfilters
+
+test_import_initless_directory_warning()
diff --git a/Lib/test/test_importhooks.py b/Lib/test/test_importhooks.py
index 0693581..e8b4695 100644
--- a/Lib/test/test_importhooks.py
+++ b/Lib/test/test_importhooks.py
@@ -14,6 +14,7 @@ def get_file():
absimp = "import sub\n"
relimp = "from . import sub\n"
+deeprelimp = "from .... import sub\n"
futimp = "from __future__ import absolute_import\n"
reload_src = test_src+"""\
@@ -26,6 +27,7 @@ reload_co = compile(reload_src, "<???>", "exec")
test2_oldabs_co = compile(absimp + test_src, "<???>", "exec")
test2_newabs_co = compile(futimp + absimp + test_src, "<???>", "exec")
test2_newrel_co = compile(relimp + test_src, "<???>", "exec")
+test2_deeprel_co = compile(deeprelimp + test_src, "<???>", "exec")
test2_futrel_co = compile(futimp + relimp + test_src, "<???>", "exec")
test_path = "!!!_test_!!!"
@@ -46,10 +48,11 @@ class TestImporter:
"hooktestmodule": (False, test_co),
"hooktestpackage": (True, test_co),
"hooktestpackage.sub": (True, test_co),
- "hooktestpackage.sub.subber": (False, test_co),
+ "hooktestpackage.sub.subber": (True, test_co),
"hooktestpackage.oldabs": (False, test2_oldabs_co),
"hooktestpackage.newabs": (False, test2_newabs_co),
"hooktestpackage.newrel": (False, test2_newrel_co),
+ "hooktestpackage.sub.subber.subest": (True, test2_deeprel_co),
"hooktestpackage.futrel": (False, test2_futrel_co),
"sub": (False, test_co),
"reloadmodule": (False, test_co),
@@ -203,6 +206,12 @@ class ImportHooksTestCase(ImportHooksBaseTestCase):
self.assertEqual(hooktestpackage.newrel.sub,
hooktestpackage.sub)
+ import hooktestpackage.sub.subber.subest as subest
+ self.assertEqual(subest.get_name(),
+ "hooktestpackage.sub.subber.subest")
+ self.assertEqual(subest.sub,
+ hooktestpackage.sub)
+
import hooktestpackage.futrel
self.assertEqual(hooktestpackage.futrel.get_name(),
"hooktestpackage.futrel")
diff --git a/Lib/test/test_locale.py b/Lib/test/test_locale.py
index 1523e77..9e264b9 100644
--- a/Lib/test/test_locale.py
+++ b/Lib/test/test_locale.py
@@ -20,14 +20,14 @@ for tloc in tlocs:
else:
raise ImportError, "test locale not supported (tried %s)"%(', '.join(tlocs))
-def testformat(formatstr, value, grouping = 0, output=None):
+def testformat(formatstr, value, grouping = 0, output=None, func=locale.format):
if verbose:
if output:
print "%s %% %s =? %s ..." %\
(repr(formatstr), repr(value), repr(output)),
else:
print "%s %% %s works? ..." % (repr(formatstr), repr(value)),
- result = locale.format(formatstr, value, grouping = grouping)
+ result = func(formatstr, value, grouping = grouping)
if output and result != output:
if verbose:
print 'no'
@@ -49,6 +49,30 @@ try:
testformat("%-10.f", 4200, grouping=1, output='4%s200 ' % sep)
# Invoke getpreferredencoding to make sure it does not cause exceptions,
locale.getpreferredencoding()
+
+ # === Test format() with more complex formatting strings
+ # test if grouping is independent from other characters in formatting string
+ testformat("One million is %i", 1000000, grouping=1,
+ output='One million is 1%s000%s000' % (sep, sep),
+ func=locale.format_string)
+ testformat("One million is %i", 1000000, grouping=1,
+ output='One million is 1%s000%s000' % (sep, sep),
+ func=locale.format_string)
+ # test dots in formatting string
+ testformat(".%f.", 1000.0, output='.1000.000000.', func=locale.format_string)
+ # test floats
+ testformat("--> %10.2f", 1000.0, grouping=1, output='--> 1%s000.00' % sep,
+ func=locale.format_string)
+ # test asterisk formats
+ testformat("%10.*f", (2, 1000.0), grouping=0, output=' 1000.00',
+ func=locale.format_string)
+ testformat("%*.*f", (10, 2, 1000.0), grouping=1, output=' 1%s000.00' % sep,
+ func=locale.format_string)
+ # test more-in-one
+ testformat("int %i float %.2f str %s", (1000, 1000.0, 'str'), grouping=1,
+ output='int 1%s000 float 1%s000.00 str str' % (sep, sep),
+ func=locale.format_string)
+
finally:
locale.setlocale(locale.LC_NUMERIC, oldlocale)
diff --git a/Lib/test/test_logging.py b/Lib/test/test_logging.py
index b689dc8..73f8288 100644
--- a/Lib/test/test_logging.py
+++ b/Lib/test/test_logging.py
@@ -28,6 +28,7 @@ import select
import os, sys, string, struct, types, cPickle, cStringIO
import socket, tempfile, threading, time
import logging, logging.handlers, logging.config
+from test.test_support import run_with_locale
BANNER = "-- %-10s %-6s ---------------------------------------------------\n"
@@ -657,19 +658,11 @@ def test_main_inner():
pass
rootLogger.removeHandler(hdlr)
+# Set the locale to the platform-dependent default. I have no idea
+# why the test does this, but in any case we save the current locale
+# first and restore it at the end.
+@run_with_locale('LC_ALL', '')
def test_main():
- import locale
- # Set the locale to the platform-dependent default. I have no idea
- # why the test does this, but in any case we save the current locale
- # first so we can restore it at the end.
- try:
- original_locale = locale.setlocale(locale.LC_ALL)
- locale.setlocale(locale.LC_ALL, '')
- except (ValueError, locale.Error):
- # this happens on a Solaris box which only supports "C" locale
- # or a Mac OS X box which supports very little locale stuff at all
- original_locale = None
-
# Save and restore the original root logger level across the tests.
# Otherwise, e.g., if any test using cookielib runs after test_logging,
# cookielib's debug-level logger tries to log messages, leading to
@@ -681,8 +674,6 @@ def test_main():
try:
test_main_inner()
finally:
- if original_locale is not None:
- locale.setlocale(locale.LC_ALL, original_locale)
root_logger.setLevel(original_logging_level)
if __name__ == "__main__":
diff --git a/Lib/test/test_mailbox.py b/Lib/test/test_mailbox.py
index 77d39a6..914a20c 100644
--- a/Lib/test/test_mailbox.py
+++ b/Lib/test/test_mailbox.py
@@ -1,15 +1,1576 @@
-import mailbox
import os
import time
-import unittest
+import stat
+import socket
+import email
+import email.Message
+import rfc822
+import re
+import StringIO
from test import test_support
-
-# cleanup earlier tests
+import unittest
+import mailbox
+import glob
try:
- os.unlink(test_support.TESTFN)
-except os.error:
+ import fcntl
+except ImportError:
pass
+
+class TestBase(unittest.TestCase):
+
+ def _check_sample(self, msg):
+ # Inspect a mailbox.Message representation of the sample message
+ self.assert_(isinstance(msg, email.Message.Message))
+ self.assert_(isinstance(msg, mailbox.Message))
+ for key, value in _sample_headers.iteritems():
+ self.assert_(value in msg.get_all(key))
+ self.assert_(msg.is_multipart())
+ self.assert_(len(msg.get_payload()) == len(_sample_payloads))
+ for i, payload in enumerate(_sample_payloads):
+ part = msg.get_payload(i)
+ self.assert_(isinstance(part, email.Message.Message))
+ self.assert_(not isinstance(part, mailbox.Message))
+ self.assert_(part.get_payload() == payload)
+
+ def _delete_recursively(self, target):
+ # Delete a file or delete a directory recursively
+ if os.path.isdir(target):
+ for path, dirs, files in os.walk(target, topdown=False):
+ for name in files:
+ os.remove(os.path.join(path, name))
+ for name in dirs:
+ os.rmdir(os.path.join(path, name))
+ os.rmdir(target)
+ elif os.path.exists(target):
+ os.remove(target)
+
+
+class TestMailbox(TestBase):
+
+ _factory = None # Overridden by subclasses to reuse tests
+ _template = 'From: foo\n\n%s'
+
+ def setUp(self):
+ self._path = test_support.TESTFN
+ self._box = self._factory(self._path)
+
+ def tearDown(self):
+ self._box.close()
+ self._delete_recursively(self._path)
+
+ def test_add(self):
+ # Add copies of a sample message
+ keys = []
+ keys.append(self._box.add(self._template % 0))
+ self.assert_(len(self._box) == 1)
+ keys.append(self._box.add(mailbox.Message(_sample_message)))
+ self.assert_(len(self._box) == 2)
+ keys.append(self._box.add(email.message_from_string(_sample_message)))
+ self.assert_(len(self._box) == 3)
+ keys.append(self._box.add(StringIO.StringIO(_sample_message)))
+ self.assert_(len(self._box) == 4)
+ keys.append(self._box.add(_sample_message))
+ self.assert_(len(self._box) == 5)
+ self.assert_(self._box.get_string(keys[0]) == self._template % 0)
+ for i in (1, 2, 3, 4):
+ self._check_sample(self._box[keys[i]])
+
+ def test_remove(self):
+ # Remove messages using remove()
+ self._test_remove_or_delitem(self._box.remove)
+
+ def test_delitem(self):
+ # Remove messages using __delitem__()
+ self._test_remove_or_delitem(self._box.__delitem__)
+
+ def _test_remove_or_delitem(self, method):
+ # (Used by test_remove() and test_delitem().)
+ key0 = self._box.add(self._template % 0)
+ key1 = self._box.add(self._template % 1)
+ self.assert_(len(self._box) == 2)
+ method(key0)
+ l = len(self._box)
+ self.assert_(l == 1, "actual l: %s" % l)
+ self.assertRaises(KeyError, lambda: self._box[key0])
+ self.assertRaises(KeyError, lambda: method(key0))
+ self.assert_(self._box.get_string(key1) == self._template % 1)
+ key2 = self._box.add(self._template % 2)
+ self.assert_(len(self._box) == 2)
+ method(key2)
+ l = len(self._box)
+ self.assert_(l == 1, "actual l: %s" % l)
+ self.assertRaises(KeyError, lambda: self._box[key2])
+ self.assertRaises(KeyError, lambda: method(key2))
+ self.assert_(self._box.get_string(key1) == self._template % 1)
+ method(key1)
+ self.assert_(len(self._box) == 0)
+ self.assertRaises(KeyError, lambda: self._box[key1])
+ self.assertRaises(KeyError, lambda: method(key1))
+
+ def test_discard(self, repetitions=10):
+ # Discard messages
+ key0 = self._box.add(self._template % 0)
+ key1 = self._box.add(self._template % 1)
+ self.assert_(len(self._box) == 2)
+ self._box.discard(key0)
+ self.assert_(len(self._box) == 1)
+ self.assertRaises(KeyError, lambda: self._box[key0])
+ self._box.discard(key0)
+ self.assert_(len(self._box) == 1)
+ self.assertRaises(KeyError, lambda: self._box[key0])
+
+ def test_get(self):
+ # Retrieve messages using get()
+ key0 = self._box.add(self._template % 0)
+ msg = self._box.get(key0)
+ self.assert_(msg['from'] == 'foo')
+ self.assert_(msg.get_payload() == '0')
+ self.assert_(self._box.get('foo') is None)
+ self.assert_(self._box.get('foo', False) is False)
+ self._box.close()
+ self._box = self._factory(self._path, factory=rfc822.Message)
+ key1 = self._box.add(self._template % 1)
+ msg = self._box.get(key1)
+ self.assert_(msg['from'] == 'foo')
+ self.assert_(msg.fp.read() == '1')
+
+ def test_getitem(self):
+ # Retrieve message using __getitem__()
+ key0 = self._box.add(self._template % 0)
+ msg = self._box[key0]
+ self.assert_(msg['from'] == 'foo')
+ self.assert_(msg.get_payload() == '0')
+ self.assertRaises(KeyError, lambda: self._box['foo'])
+ self._box.discard(key0)
+ self.assertRaises(KeyError, lambda: self._box[key0])
+
+ def test_get_message(self):
+ # Get Message representations of messages
+ key0 = self._box.add(self._template % 0)
+ key1 = self._box.add(_sample_message)
+ msg0 = self._box.get_message(key0)
+ self.assert_(isinstance(msg0, mailbox.Message))
+ self.assert_(msg0['from'] == 'foo')
+ self.assert_(msg0.get_payload() == '0')
+ self._check_sample(self._box.get_message(key1))
+
+ def test_get_string(self):
+ # Get string representations of messages
+ key0 = self._box.add(self._template % 0)
+ key1 = self._box.add(_sample_message)
+ self.assert_(self._box.get_string(key0) == self._template % 0)
+ self.assert_(self._box.get_string(key1) == _sample_message)
+
+ def test_get_file(self):
+ # Get file representations of messages
+ key0 = self._box.add(self._template % 0)
+ key1 = self._box.add(_sample_message)
+ self.assert_(self._box.get_file(key0).read().replace(os.linesep, '\n')
+ == self._template % 0)
+ self.assert_(self._box.get_file(key1).read().replace(os.linesep, '\n')
+ == _sample_message)
+
+ def test_iterkeys(self):
+ # Get keys using iterkeys()
+ self._check_iteration(self._box.iterkeys, do_keys=True, do_values=False)
+
+ def test_keys(self):
+ # Get keys using keys()
+ self._check_iteration(self._box.keys, do_keys=True, do_values=False)
+
+ def test_itervalues(self):
+ # Get values using itervalues()
+ self._check_iteration(self._box.itervalues, do_keys=False,
+ do_values=True)
+
+ def test_iter(self):
+ # Get values using __iter__()
+ self._check_iteration(self._box.__iter__, do_keys=False,
+ do_values=True)
+
+ def test_values(self):
+ # Get values using values()
+ self._check_iteration(self._box.values, do_keys=False, do_values=True)
+
+ def test_iteritems(self):
+ # Get keys and values using iteritems()
+ self._check_iteration(self._box.iteritems, do_keys=True,
+ do_values=True)
+
+ def test_items(self):
+ # Get keys and values using items()
+ self._check_iteration(self._box.items, do_keys=True, do_values=True)
+
+ def _check_iteration(self, method, do_keys, do_values, repetitions=10):
+ for value in method():
+ self.fail("Not empty")
+ keys, values = [], []
+ for i in xrange(repetitions):
+ keys.append(self._box.add(self._template % i))
+ values.append(self._template % i)
+ if do_keys and not do_values:
+ returned_keys = list(method())
+ elif do_values and not do_keys:
+ returned_values = list(method())
+ else:
+ returned_keys, returned_values = [], []
+ for key, value in method():
+ returned_keys.append(key)
+ returned_values.append(value)
+ if do_keys:
+ self.assert_(len(keys) == len(returned_keys))
+ self.assert_(set(keys) == set(returned_keys))
+ if do_values:
+ count = 0
+ for value in returned_values:
+ self.assert_(value['from'] == 'foo')
+ self.assert_(int(value.get_payload()) < repetitions)
+ count += 1
+ self.assert_(len(values) == count)
+
+ def test_has_key(self):
+ # Check existence of keys using has_key()
+ self._test_has_key_or_contains(self._box.has_key)
+
+ def test_contains(self):
+ # Check existence of keys using __contains__()
+ self._test_has_key_or_contains(self._box.__contains__)
+
+ def _test_has_key_or_contains(self, method):
+ # (Used by test_has_key() and test_contains().)
+ self.assert_(not method('foo'))
+ key0 = self._box.add(self._template % 0)
+ self.assert_(method(key0))
+ self.assert_(not method('foo'))
+ key1 = self._box.add(self._template % 1)
+ self.assert_(method(key1))
+ self.assert_(method(key0))
+ self.assert_(not method('foo'))
+ self._box.remove(key0)
+ self.assert_(not method(key0))
+ self.assert_(method(key1))
+ self.assert_(not method('foo'))
+ self._box.remove(key1)
+ self.assert_(not method(key1))
+ self.assert_(not method(key0))
+ self.assert_(not method('foo'))
+
+ def test_len(self, repetitions=10):
+ # Get message count
+ keys = []
+ for i in xrange(repetitions):
+ self.assert_(len(self._box) == i)
+ keys.append(self._box.add(self._template % i))
+ self.assert_(len(self._box) == i + 1)
+ for i in xrange(repetitions):
+ self.assert_(len(self._box) == repetitions - i)
+ self._box.remove(keys[i])
+ self.assert_(len(self._box) == repetitions - i - 1)
+
+ def test_set_item(self):
+ # Modify messages using __setitem__()
+ key0 = self._box.add(self._template % 'original 0')
+ self.assert_(self._box.get_string(key0) == \
+ self._template % 'original 0')
+ key1 = self._box.add(self._template % 'original 1')
+ self.assert_(self._box.get_string(key1) == \
+ self._template % 'original 1')
+ self._box[key0] = self._template % 'changed 0'
+ self.assert_(self._box.get_string(key0) == \
+ self._template % 'changed 0')
+ self._box[key1] = self._template % 'changed 1'
+ self.assert_(self._box.get_string(key1) == \
+ self._template % 'changed 1')
+ self._box[key0] = _sample_message
+ self._check_sample(self._box[key0])
+ self._box[key1] = self._box[key0]
+ self._check_sample(self._box[key1])
+ self._box[key0] = self._template % 'original 0'
+ self.assert_(self._box.get_string(key0) ==
+ self._template % 'original 0')
+ self._check_sample(self._box[key1])
+ self.assertRaises(KeyError,
+ lambda: self._box.__setitem__('foo', 'bar'))
+ self.assertRaises(KeyError, lambda: self._box['foo'])
+ self.assert_(len(self._box) == 2)
+
+ def test_clear(self, iterations=10):
+ # Remove all messages using clear()
+ keys = []
+ for i in xrange(iterations):
+ self._box.add(self._template % i)
+ for i, key in enumerate(keys):
+ self.assert_(self._box.get_string(key) == self._template % i)
+ self._box.clear()
+ self.assert_(len(self._box) == 0)
+ for i, key in enumerate(keys):
+ self.assertRaises(KeyError, lambda: self._box.get_string(key))
+
+ def test_pop(self):
+ # Get and remove a message using pop()
+ key0 = self._box.add(self._template % 0)
+ self.assert_(key0 in self._box)
+ key1 = self._box.add(self._template % 1)
+ self.assert_(key1 in self._box)
+ self.assert_(self._box.pop(key0).get_payload() == '0')
+ self.assert_(key0 not in self._box)
+ self.assert_(key1 in self._box)
+ key2 = self._box.add(self._template % 2)
+ self.assert_(key2 in self._box)
+ self.assert_(self._box.pop(key2).get_payload() == '2')
+ self.assert_(key2 not in self._box)
+ self.assert_(key1 in self._box)
+ self.assert_(self._box.pop(key1).get_payload() == '1')
+ self.assert_(key1 not in self._box)
+ self.assert_(len(self._box) == 0)
+
+ def test_popitem(self, iterations=10):
+ # Get and remove an arbitrary (key, message) using popitem()
+ keys = []
+ for i in xrange(10):
+ keys.append(self._box.add(self._template % i))
+ seen = []
+ for i in xrange(10):
+ key, msg = self._box.popitem()
+ self.assert_(key in keys)
+ self.assert_(key not in seen)
+ seen.append(key)
+ self.assert_(int(msg.get_payload()) == keys.index(key))
+ self.assert_(len(self._box) == 0)
+ for key in keys:
+ self.assertRaises(KeyError, lambda: self._box[key])
+
+ def test_update(self):
+ # Modify multiple messages using update()
+ key0 = self._box.add(self._template % 'original 0')
+ key1 = self._box.add(self._template % 'original 1')
+ key2 = self._box.add(self._template % 'original 2')
+ self._box.update({key0: self._template % 'changed 0',
+ key2: _sample_message})
+ self.assert_(len(self._box) == 3)
+ self.assert_(self._box.get_string(key0) ==
+ self._template % 'changed 0')
+ self.assert_(self._box.get_string(key1) ==
+ self._template % 'original 1')
+ self._check_sample(self._box[key2])
+ self._box.update([(key2, self._template % 'changed 2'),
+ (key1, self._template % 'changed 1'),
+ (key0, self._template % 'original 0')])
+ self.assert_(len(self._box) == 3)
+ self.assert_(self._box.get_string(key0) ==
+ self._template % 'original 0')
+ self.assert_(self._box.get_string(key1) ==
+ self._template % 'changed 1')
+ self.assert_(self._box.get_string(key2) ==
+ self._template % 'changed 2')
+ self.assertRaises(KeyError,
+ lambda: self._box.update({'foo': 'bar',
+ key0: self._template % "changed 0"}))
+ self.assert_(len(self._box) == 3)
+ self.assert_(self._box.get_string(key0) ==
+ self._template % "changed 0")
+ self.assert_(self._box.get_string(key1) ==
+ self._template % "changed 1")
+ self.assert_(self._box.get_string(key2) ==
+ self._template % "changed 2")
+
+ def test_flush(self):
+ # Write changes to disk
+ self._test_flush_or_close(self._box.flush)
+
+ def test_lock_unlock(self):
+ # Lock and unlock the mailbox
+ self.assert_(not os.path.exists(self._get_lock_path()))
+ self._box.lock()
+ self.assert_(os.path.exists(self._get_lock_path()))
+ self._box.unlock()
+ self.assert_(not os.path.exists(self._get_lock_path()))
+
+ def test_close(self):
+ # Close mailbox and flush changes to disk
+ self._test_flush_or_close(self._box.close)
+
+ def _test_flush_or_close(self, method):
+ contents = [self._template % i for i in xrange(3)]
+ self._box.add(contents[0])
+ self._box.add(contents[1])
+ self._box.add(contents[2])
+ method()
+ self._box = self._factory(self._path)
+ keys = self._box.keys()
+ self.assert_(len(keys) == 3)
+ for key in keys:
+ self.assert_(self._box.get_string(key) in contents)
+
+ def test_dump_message(self):
+ # Write message representations to disk
+ for input in (email.message_from_string(_sample_message),
+ _sample_message, StringIO.StringIO(_sample_message)):
+ output = StringIO.StringIO()
+ self._box._dump_message(input, output)
+ self.assert_(output.getvalue() ==
+ _sample_message.replace('\n', os.linesep))
+ output = StringIO.StringIO()
+ self.assertRaises(TypeError,
+ lambda: self._box._dump_message(None, output))
+
+ def _get_lock_path(self):
+ # Return the path of the dot lock file. May be overridden.
+ return self._path + '.lock'
+
+
+class TestMailboxSuperclass(TestBase):
+
+ def test_notimplemented(self):
+ # Test that all Mailbox methods raise NotImplementedException.
+ box = mailbox.Mailbox('path')
+ self.assertRaises(NotImplementedError, lambda: box.add(''))
+ self.assertRaises(NotImplementedError, lambda: box.remove(''))
+ self.assertRaises(NotImplementedError, lambda: box.__delitem__(''))
+ self.assertRaises(NotImplementedError, lambda: box.discard(''))
+ self.assertRaises(NotImplementedError, lambda: box.__setitem__('', ''))
+ self.assertRaises(NotImplementedError, lambda: box.iterkeys())
+ self.assertRaises(NotImplementedError, lambda: box.keys())
+ self.assertRaises(NotImplementedError, lambda: box.itervalues().next())
+ self.assertRaises(NotImplementedError, lambda: box.__iter__().next())
+ self.assertRaises(NotImplementedError, lambda: box.values())
+ self.assertRaises(NotImplementedError, lambda: box.iteritems().next())
+ self.assertRaises(NotImplementedError, lambda: box.items())
+ self.assertRaises(NotImplementedError, lambda: box.get(''))
+ self.assertRaises(NotImplementedError, lambda: box.__getitem__(''))
+ self.assertRaises(NotImplementedError, lambda: box.get_message(''))
+ self.assertRaises(NotImplementedError, lambda: box.get_string(''))
+ self.assertRaises(NotImplementedError, lambda: box.get_file(''))
+ self.assertRaises(NotImplementedError, lambda: box.has_key(''))
+ self.assertRaises(NotImplementedError, lambda: box.__contains__(''))
+ self.assertRaises(NotImplementedError, lambda: box.__len__())
+ self.assertRaises(NotImplementedError, lambda: box.clear())
+ self.assertRaises(NotImplementedError, lambda: box.pop(''))
+ self.assertRaises(NotImplementedError, lambda: box.popitem())
+ self.assertRaises(NotImplementedError, lambda: box.update((('', ''),)))
+ self.assertRaises(NotImplementedError, lambda: box.flush())
+ self.assertRaises(NotImplementedError, lambda: box.lock())
+ self.assertRaises(NotImplementedError, lambda: box.unlock())
+ self.assertRaises(NotImplementedError, lambda: box.close())
+
+
+class TestMaildir(TestMailbox):
+
+ _factory = lambda self, path, factory=None: mailbox.Maildir(path, factory)
+
+ def setUp(self):
+ TestMailbox.setUp(self)
+ if os.name == 'nt':
+ self._box.colon = '!'
+
+ def test_add_MM(self):
+ # Add a MaildirMessage instance
+ msg = mailbox.MaildirMessage(self._template % 0)
+ msg.set_subdir('cur')
+ msg.set_info('foo')
+ key = self._box.add(msg)
+ self.assert_(os.path.exists(os.path.join(self._path, 'cur', '%s%sfoo' %
+ (key, self._box.colon))))
+
+ def test_get_MM(self):
+ # Get a MaildirMessage instance
+ msg = mailbox.MaildirMessage(self._template % 0)
+ msg.set_subdir('cur')
+ msg.set_flags('RF')
+ key = self._box.add(msg)
+ msg_returned = self._box.get_message(key)
+ self.assert_(isinstance(msg_returned, mailbox.MaildirMessage))
+ self.assert_(msg_returned.get_subdir() == 'cur')
+ self.assert_(msg_returned.get_flags() == 'FR')
+
+ def test_set_MM(self):
+ # Set with a MaildirMessage instance
+ msg0 = mailbox.MaildirMessage(self._template % 0)
+ msg0.set_flags('TP')
+ key = self._box.add(msg0)
+ msg_returned = self._box.get_message(key)
+ self.assert_(msg_returned.get_subdir() == 'new')
+ self.assert_(msg_returned.get_flags() == 'PT')
+ msg1 = mailbox.MaildirMessage(self._template % 1)
+ self._box[key] = msg1
+ msg_returned = self._box.get_message(key)
+ self.assert_(msg_returned.get_subdir() == 'new')
+ self.assert_(msg_returned.get_flags() == '')
+ self.assert_(msg_returned.get_payload() == '1')
+ msg2 = mailbox.MaildirMessage(self._template % 2)
+ msg2.set_info('2,S')
+ self._box[key] = msg2
+ self._box[key] = self._template % 3
+ msg_returned = self._box.get_message(key)
+ self.assert_(msg_returned.get_subdir() == 'new')
+ self.assert_(msg_returned.get_flags() == 'S')
+ self.assert_(msg_returned.get_payload() == '3')
+
+ def test_initialize_new(self):
+ # Initialize a non-existent mailbox
+ self.tearDown()
+ self._box = mailbox.Maildir(self._path)
+ self._check_basics(factory=rfc822.Message)
+ self._delete_recursively(self._path)
+ self._box = self._factory(self._path, factory=None)
+ self._check_basics()
+
+ def test_initialize_existing(self):
+ # Initialize an existing mailbox
+ self.tearDown()
+ for subdir in '', 'tmp', 'new', 'cur':
+ os.mkdir(os.path.join(self._path, subdir))
+ self._box = mailbox.Maildir(self._path)
+ self._check_basics(factory=rfc822.Message)
+ self._box = mailbox.Maildir(self._path, factory=None)
+ self._check_basics()
+
+ def _check_basics(self, factory=None):
+ # (Used by test_open_new() and test_open_existing().)
+ self.assertEqual(self._box._path, os.path.abspath(self._path))
+ self.assertEqual(self._box._factory, factory)
+ for subdir in '', 'tmp', 'new', 'cur':
+ path = os.path.join(self._path, subdir)
+ mode = os.stat(path)[stat.ST_MODE]
+ self.assert_(stat.S_ISDIR(mode), "Not a directory: '%s'" % path)
+
+ def test_list_folders(self):
+ # List folders
+ self._box.add_folder('one')
+ self._box.add_folder('two')
+ self._box.add_folder('three')
+ self.assert_(len(self._box.list_folders()) == 3)
+ self.assert_(set(self._box.list_folders()) ==
+ set(('one', 'two', 'three')))
+
+ def test_get_folder(self):
+ # Open folders
+ self._box.add_folder('foo.bar')
+ folder0 = self._box.get_folder('foo.bar')
+ folder0.add(self._template % 'bar')
+ self.assert_(os.path.isdir(os.path.join(self._path, '.foo.bar')))
+ folder1 = self._box.get_folder('foo.bar')
+ self.assert_(folder1.get_string(folder1.keys()[0]) == \
+ self._template % 'bar')
+
+ def test_add_and_remove_folders(self):
+ # Delete folders
+ self._box.add_folder('one')
+ self._box.add_folder('two')
+ self.assert_(len(self._box.list_folders()) == 2)
+ self.assert_(set(self._box.list_folders()) == set(('one', 'two')))
+ self._box.remove_folder('one')
+ self.assert_(len(self._box.list_folders()) == 1)
+ self.assert_(set(self._box.list_folders()) == set(('two',)))
+ self._box.add_folder('three')
+ self.assert_(len(self._box.list_folders()) == 2)
+ self.assert_(set(self._box.list_folders()) == set(('two', 'three')))
+ self._box.remove_folder('three')
+ self.assert_(len(self._box.list_folders()) == 1)
+ self.assert_(set(self._box.list_folders()) == set(('two',)))
+ self._box.remove_folder('two')
+ self.assert_(len(self._box.list_folders()) == 0)
+ self.assert_(self._box.list_folders() == [])
+
+ def test_clean(self):
+ # Remove old files from 'tmp'
+ foo_path = os.path.join(self._path, 'tmp', 'foo')
+ bar_path = os.path.join(self._path, 'tmp', 'bar')
+ f = open(foo_path, 'w')
+ f.write("@")
+ f.close()
+ f = open(bar_path, 'w')
+ f.write("@")
+ f.close()
+ self._box.clean()
+ self.assert_(os.path.exists(foo_path))
+ self.assert_(os.path.exists(bar_path))
+ foo_stat = os.stat(foo_path)
+ os.utime(foo_path, (time.time() - 129600 - 2,
+ foo_stat.st_mtime))
+ self._box.clean()
+ self.assert_(not os.path.exists(foo_path))
+ self.assert_(os.path.exists(bar_path))
+
+ def test_create_tmp(self, repetitions=10):
+ # Create files in tmp directory
+ hostname = socket.gethostname()
+ if '/' in hostname:
+ hostname = hostname.replace('/', r'\057')
+ if ':' in hostname:
+ hostname = hostname.replace(':', r'\072')
+ pid = os.getpid()
+ pattern = re.compile(r"(?P<time>\d+)\.M(?P<M>\d{1,6})P(?P<P>\d+)"
+ r"Q(?P<Q>\d+)\.(?P<host>[^:/]+)")
+ previous_groups = None
+ for x in xrange(repetitions):
+ tmp_file = self._box._create_tmp()
+ head, tail = os.path.split(tmp_file.name)
+ self.assertEqual(head, os.path.abspath(os.path.join(self._path,
+ "tmp")),
+ "File in wrong location: '%s'" % head)
+ match = pattern.match(tail)
+ self.assert_(match != None, "Invalid file name: '%s'" % tail)
+ groups = match.groups()
+ if previous_groups != None:
+ self.assert_(int(groups[0] >= previous_groups[0]),
+ "Non-monotonic seconds: '%s' before '%s'" %
+ (previous_groups[0], groups[0]))
+ self.assert_(int(groups[1] >= previous_groups[1]) or
+ groups[0] != groups[1],
+ "Non-monotonic milliseconds: '%s' before '%s'" %
+ (previous_groups[1], groups[1]))
+ self.assert_(int(groups[2]) == pid,
+ "Process ID mismatch: '%s' should be '%s'" %
+ (groups[2], pid))
+ self.assert_(int(groups[3]) == int(previous_groups[3]) + 1,
+ "Non-sequential counter: '%s' before '%s'" %
+ (previous_groups[3], groups[3]))
+ self.assert_(groups[4] == hostname,
+ "Host name mismatch: '%s' should be '%s'" %
+ (groups[4], hostname))
+ previous_groups = groups
+ tmp_file.write(_sample_message)
+ tmp_file.seek(0)
+ self.assert_(tmp_file.read() == _sample_message)
+ tmp_file.close()
+ file_count = len(os.listdir(os.path.join(self._path, "tmp")))
+ self.assert_(file_count == repetitions,
+ "Wrong file count: '%s' should be '%s'" %
+ (file_count, repetitions))
+
+ def test_refresh(self):
+ # Update the table of contents
+ self.assert_(self._box._toc == {})
+ key0 = self._box.add(self._template % 0)
+ key1 = self._box.add(self._template % 1)
+ self.assert_(self._box._toc == {})
+ self._box._refresh()
+ self.assert_(self._box._toc == {key0: os.path.join('new', key0),
+ key1: os.path.join('new', key1)})
+ key2 = self._box.add(self._template % 2)
+ self.assert_(self._box._toc == {key0: os.path.join('new', key0),
+ key1: os.path.join('new', key1)})
+ self._box._refresh()
+ self.assert_(self._box._toc == {key0: os.path.join('new', key0),
+ key1: os.path.join('new', key1),
+ key2: os.path.join('new', key2)})
+
+ def test_lookup(self):
+ # Look up message subpaths in the TOC
+ self.assertRaises(KeyError, lambda: self._box._lookup('foo'))
+ key0 = self._box.add(self._template % 0)
+ self.assert_(self._box._lookup(key0) == os.path.join('new', key0))
+ os.remove(os.path.join(self._path, 'new', key0))
+ self.assert_(self._box._toc == {key0: os.path.join('new', key0)})
+ self.assertRaises(KeyError, lambda: self._box._lookup(key0))
+ self.assert_(self._box._toc == {})
+
+ def test_lock_unlock(self):
+ # Lock and unlock the mailbox. For Maildir, this does nothing.
+ self._box.lock()
+ self._box.unlock()
+
+
+class _TestMboxMMDF(TestMailbox):
+
+ def tearDown(self):
+ self._box.close()
+ self._delete_recursively(self._path)
+ for lock_remnant in glob.glob(self._path + '.*'):
+ os.remove(lock_remnant)
+
+ def test_add_from_string(self):
+ # Add a string starting with 'From ' to the mailbox
+ key = self._box.add('From foo@bar blah\nFrom: foo\n\n0')
+ self.assert_(self._box[key].get_from() == 'foo@bar blah')
+ self.assert_(self._box[key].get_payload() == '0')
+
+ def test_add_mbox_or_mmdf_message(self):
+ # Add an mboxMessage or MMDFMessage
+ for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage):
+ msg = class_('From foo@bar blah\nFrom: foo\n\n0')
+ key = self._box.add(msg)
+
+ def test_open_close_open(self):
+ # Open and inspect previously-created mailbox
+ values = [self._template % i for i in xrange(3)]
+ for value in values:
+ self._box.add(value)
+ self._box.close()
+ mtime = os.path.getmtime(self._path)
+ self._box = self._factory(self._path)
+ self.assert_(len(self._box) == 3)
+ for key in self._box.iterkeys():
+ self.assert_(self._box.get_string(key) in values)
+ self._box.close()
+ self.assert_(mtime == os.path.getmtime(self._path))
+
+ def test_add_and_close(self):
+ # Verifying that closing a mailbox doesn't change added items
+ self._box.add(_sample_message)
+ for i in xrange(3):
+ self._box.add(self._template % i)
+ self._box.add(_sample_message)
+ self._box._file.flush()
+ self._box._file.seek(0)
+ contents = self._box._file.read()
+ self._box.close()
+ self.assert_(contents == open(self._path, 'rb').read())
+ self._box = self._factory(self._path)
+
+
+class TestMbox(_TestMboxMMDF):
+
+ _factory = lambda self, path, factory=None: mailbox.mbox(path, factory)
+
+
+class TestMMDF(_TestMboxMMDF):
+
+ _factory = lambda self, path, factory=None: mailbox.MMDF(path, factory)
+
+
+class TestMH(TestMailbox):
+
+ _factory = lambda self, path, factory=None: mailbox.MH(path, factory)
+
+ def test_list_folders(self):
+ # List folders
+ self._box.add_folder('one')
+ self._box.add_folder('two')
+ self._box.add_folder('three')
+ self.assert_(len(self._box.list_folders()) == 3)
+ self.assert_(set(self._box.list_folders()) ==
+ set(('one', 'two', 'three')))
+
+ def test_get_folder(self):
+ # Open folders
+ self._box.add_folder('foo.bar')
+ folder0 = self._box.get_folder('foo.bar')
+ folder0.add(self._template % 'bar')
+ self.assert_(os.path.isdir(os.path.join(self._path, 'foo.bar')))
+ folder1 = self._box.get_folder('foo.bar')
+ self.assert_(folder1.get_string(folder1.keys()[0]) == \
+ self._template % 'bar')
+
+ def test_add_and_remove_folders(self):
+ # Delete folders
+ self._box.add_folder('one')
+ self._box.add_folder('two')
+ self.assert_(len(self._box.list_folders()) == 2)
+ self.assert_(set(self._box.list_folders()) == set(('one', 'two')))
+ self._box.remove_folder('one')
+ self.assert_(len(self._box.list_folders()) == 1)
+ self.assert_(set(self._box.list_folders()) == set(('two',)))
+ self._box.add_folder('three')
+ self.assert_(len(self._box.list_folders()) == 2)
+ self.assert_(set(self._box.list_folders()) == set(('two', 'three')))
+ self._box.remove_folder('three')
+ self.assert_(len(self._box.list_folders()) == 1)
+ self.assert_(set(self._box.list_folders()) == set(('two',)))
+ self._box.remove_folder('two')
+ self.assert_(len(self._box.list_folders()) == 0)
+ self.assert_(self._box.list_folders() == [])
+
+ def test_sequences(self):
+ # Get and set sequences
+ self.assert_(self._box.get_sequences() == {})
+ msg0 = mailbox.MHMessage(self._template % 0)
+ msg0.add_sequence('foo')
+ key0 = self._box.add(msg0)
+ self.assert_(self._box.get_sequences() == {'foo':[key0]})
+ msg1 = mailbox.MHMessage(self._template % 1)
+ msg1.set_sequences(['bar', 'replied', 'foo'])
+ key1 = self._box.add(msg1)
+ self.assert_(self._box.get_sequences() ==
+ {'foo':[key0, key1], 'bar':[key1], 'replied':[key1]})
+ msg0.set_sequences(['flagged'])
+ self._box[key0] = msg0
+ self.assert_(self._box.get_sequences() ==
+ {'foo':[key1], 'bar':[key1], 'replied':[key1],
+ 'flagged':[key0]})
+ self._box.remove(key1)
+ self.assert_(self._box.get_sequences() == {'flagged':[key0]})
+
+ def test_pack(self):
+ # Pack the contents of the mailbox
+ msg0 = mailbox.MHMessage(self._template % 0)
+ msg1 = mailbox.MHMessage(self._template % 1)
+ msg2 = mailbox.MHMessage(self._template % 2)
+ msg3 = mailbox.MHMessage(self._template % 3)
+ msg0.set_sequences(['foo', 'unseen'])
+ msg1.set_sequences(['foo'])
+ msg2.set_sequences(['foo', 'flagged'])
+ msg3.set_sequences(['foo', 'bar', 'replied'])
+ key0 = self._box.add(msg0)
+ key1 = self._box.add(msg1)
+ key2 = self._box.add(msg2)
+ key3 = self._box.add(msg3)
+ self.assert_(self._box.get_sequences() ==
+ {'foo':[key0,key1,key2,key3], 'unseen':[key0],
+ 'flagged':[key2], 'bar':[key3], 'replied':[key3]})
+ self._box.remove(key2)
+ self.assert_(self._box.get_sequences() ==
+ {'foo':[key0,key1,key3], 'unseen':[key0], 'bar':[key3],
+ 'replied':[key3]})
+ self._box.pack()
+ self.assert_(self._box.keys() == [1, 2, 3])
+ key0 = key0
+ key1 = key0 + 1
+ key2 = key1 + 1
+ self.assert_(self._box.get_sequences() ==
+ {'foo':[1, 2, 3], 'unseen':[1], 'bar':[3], 'replied':[3]})
+
+ def _get_lock_path(self):
+ return os.path.join(self._path, '.mh_sequences.lock')
+
+
+class TestBabyl(TestMailbox):
+
+ _factory = lambda self, path, factory=None: mailbox.Babyl(path, factory)
+
+ def tearDown(self):
+ self._box.close()
+ self._delete_recursively(self._path)
+ for lock_remnant in glob.glob(self._path + '.*'):
+ os.remove(lock_remnant)
+
+ def test_labels(self):
+ # Get labels from the mailbox
+ self.assert_(self._box.get_labels() == [])
+ msg0 = mailbox.BabylMessage(self._template % 0)
+ msg0.add_label('foo')
+ key0 = self._box.add(msg0)
+ self.assert_(self._box.get_labels() == ['foo'])
+ msg1 = mailbox.BabylMessage(self._template % 1)
+ msg1.set_labels(['bar', 'answered', 'foo'])
+ key1 = self._box.add(msg1)
+ self.assert_(set(self._box.get_labels()) == set(['foo', 'bar']))
+ msg0.set_labels(['blah', 'filed'])
+ self._box[key0] = msg0
+ self.assert_(set(self._box.get_labels()) ==
+ set(['foo', 'bar', 'blah']))
+ self._box.remove(key1)
+ self.assert_(set(self._box.get_labels()) == set(['blah']))
+
+
+class TestMessage(TestBase):
+
+ _factory = mailbox.Message # Overridden by subclasses to reuse tests
+
+ def setUp(self):
+ self._path = test_support.TESTFN
+
+ def tearDown(self):
+ self._delete_recursively(self._path)
+
+ def test_initialize_with_eMM(self):
+ # Initialize based on email.Message.Message instance
+ eMM = email.message_from_string(_sample_message)
+ msg = self._factory(eMM)
+ self._post_initialize_hook(msg)
+ self._check_sample(msg)
+
+ def test_initialize_with_string(self):
+ # Initialize based on string
+ msg = self._factory(_sample_message)
+ self._post_initialize_hook(msg)
+ self._check_sample(msg)
+
+ def test_initialize_with_file(self):
+ # Initialize based on contents of file
+ f = open(self._path, 'w+')
+ f.write(_sample_message)
+ f.seek(0)
+ msg = self._factory(f)
+ self._post_initialize_hook(msg)
+ self._check_sample(msg)
+ f.close()
+
+ def test_initialize_with_nothing(self):
+ # Initialize without arguments
+ msg = self._factory()
+ self._post_initialize_hook(msg)
+ self.assert_(isinstance(msg, email.Message.Message))
+ self.assert_(isinstance(msg, mailbox.Message))
+ self.assert_(isinstance(msg, self._factory))
+ self.assert_(msg.keys() == [])
+ self.assert_(not msg.is_multipart())
+ self.assert_(msg.get_payload() == None)
+
+ def test_initialize_incorrectly(self):
+ # Initialize with invalid argument
+ self.assertRaises(TypeError, lambda: self._factory(object()))
+
+ def test_become_message(self):
+ # Take on the state of another message
+ eMM = email.message_from_string(_sample_message)
+ msg = self._factory()
+ msg._become_message(eMM)
+ self._check_sample(msg)
+
+ def test_explain_to(self):
+ # Copy self's format-specific data to other message formats.
+ # This test is superficial; better ones are in TestMessageConversion.
+ msg = self._factory()
+ for class_ in (mailbox.Message, mailbox.MaildirMessage,
+ mailbox.mboxMessage, mailbox.MHMessage,
+ mailbox.BabylMessage, mailbox.MMDFMessage):
+ other_msg = class_()
+ msg._explain_to(other_msg)
+ other_msg = email.Message.Message()
+ self.assertRaises(TypeError, lambda: msg._explain_to(other_msg))
+
+ def _post_initialize_hook(self, msg):
+ # Overridden by subclasses to check extra things after initialization
+ pass
+
+
+class TestMaildirMessage(TestMessage):
+
+ _factory = mailbox.MaildirMessage
+
+ def _post_initialize_hook(self, msg):
+ self.assert_(msg._subdir == 'new')
+ self.assert_(msg._info == '')
+
+ def test_subdir(self):
+ # Use get_subdir() and set_subdir()
+ msg = mailbox.MaildirMessage(_sample_message)
+ self.assert_(msg.get_subdir() == 'new')
+ msg.set_subdir('cur')
+ self.assert_(msg.get_subdir() == 'cur')
+ msg.set_subdir('new')
+ self.assert_(msg.get_subdir() == 'new')
+ self.assertRaises(ValueError, lambda: msg.set_subdir('tmp'))
+ self.assert_(msg.get_subdir() == 'new')
+ msg.set_subdir('new')
+ self.assert_(msg.get_subdir() == 'new')
+ self._check_sample(msg)
+
+ def test_flags(self):
+ # Use get_flags(), set_flags(), add_flag(), remove_flag()
+ msg = mailbox.MaildirMessage(_sample_message)
+ self.assert_(msg.get_flags() == '')
+ self.assert_(msg.get_subdir() == 'new')
+ msg.set_flags('F')
+ self.assert_(msg.get_subdir() == 'new')
+ self.assert_(msg.get_flags() == 'F')
+ msg.set_flags('SDTP')
+ self.assert_(msg.get_flags() == 'DPST')
+ msg.add_flag('FT')
+ self.assert_(msg.get_flags() == 'DFPST')
+ msg.remove_flag('TDRP')
+ self.assert_(msg.get_flags() == 'FS')
+ self.assert_(msg.get_subdir() == 'new')
+ self._check_sample(msg)
+
+ def test_date(self):
+ # Use get_date() and set_date()
+ msg = mailbox.MaildirMessage(_sample_message)
+ self.assert_(abs(msg.get_date() - time.time()) < 60)
+ msg.set_date(0.0)
+ self.assert_(msg.get_date() == 0.0)
+
+ def test_info(self):
+ # Use get_info() and set_info()
+ msg = mailbox.MaildirMessage(_sample_message)
+ self.assert_(msg.get_info() == '')
+ msg.set_info('1,foo=bar')
+ self.assert_(msg.get_info() == '1,foo=bar')
+ self.assertRaises(TypeError, lambda: msg.set_info(None))
+ self._check_sample(msg)
+
+ def test_info_and_flags(self):
+ # Test interaction of info and flag methods
+ msg = mailbox.MaildirMessage(_sample_message)
+ self.assert_(msg.get_info() == '')
+ msg.set_flags('SF')
+ self.assert_(msg.get_flags() == 'FS')
+ self.assert_(msg.get_info() == '2,FS')
+ msg.set_info('1,')
+ self.assert_(msg.get_flags() == '')
+ self.assert_(msg.get_info() == '1,')
+ msg.remove_flag('RPT')
+ self.assert_(msg.get_flags() == '')
+ self.assert_(msg.get_info() == '1,')
+ msg.add_flag('D')
+ self.assert_(msg.get_flags() == 'D')
+ self.assert_(msg.get_info() == '2,D')
+ self._check_sample(msg)
+
+
+class _TestMboxMMDFMessage(TestMessage):
+
+ _factory = mailbox._mboxMMDFMessage
+
+ def _post_initialize_hook(self, msg):
+ self._check_from(msg)
+
+ def test_initialize_with_unixfrom(self):
+ # Initialize with a message that already has a _unixfrom attribute
+ msg = mailbox.Message(_sample_message)
+ msg.set_unixfrom('From foo@bar blah')
+ msg = mailbox.mboxMessage(msg)
+ self.assert_(msg.get_from() == 'foo@bar blah', msg.get_from())
+
+ def test_from(self):
+ # Get and set "From " line
+ msg = mailbox.mboxMessage(_sample_message)
+ self._check_from(msg)
+ msg.set_from('foo bar')
+ self.assert_(msg.get_from() == 'foo bar')
+ msg.set_from('foo@bar', True)
+ self._check_from(msg, 'foo@bar')
+ msg.set_from('blah@temp', time.localtime())
+ self._check_from(msg, 'blah@temp')
+
+ def test_flags(self):
+ # Use get_flags(), set_flags(), add_flag(), remove_flag()
+ msg = mailbox.mboxMessage(_sample_message)
+ self.assert_(msg.get_flags() == '')
+ msg.set_flags('F')
+ self.assert_(msg.get_flags() == 'F')
+ msg.set_flags('XODR')
+ self.assert_(msg.get_flags() == 'RODX')
+ msg.add_flag('FA')
+ self.assert_(msg.get_flags() == 'RODFAX')
+ msg.remove_flag('FDXA')
+ self.assert_(msg.get_flags() == 'RO')
+ self._check_sample(msg)
+
+ def _check_from(self, msg, sender=None):
+ # Check contents of "From " line
+ if sender is None:
+ sender = "MAILER-DAEMON"
+ self.assert_(re.match(sender + r" \w{3} \w{3} [\d ]\d [\d ]\d:\d{2}:"
+ r"\d{2} \d{4}", msg.get_from()) is not None)
+
+
+class TestMboxMessage(_TestMboxMMDFMessage):
+
+ _factory = mailbox.mboxMessage
+
+
+class TestMHMessage(TestMessage):
+
+ _factory = mailbox.MHMessage
+
+ def _post_initialize_hook(self, msg):
+ self.assert_(msg._sequences == [])
+
+ def test_sequences(self):
+ # Get, set, join, and leave sequences
+ msg = mailbox.MHMessage(_sample_message)
+ self.assert_(msg.get_sequences() == [])
+ msg.set_sequences(['foobar'])
+ self.assert_(msg.get_sequences() == ['foobar'])
+ msg.set_sequences([])
+ self.assert_(msg.get_sequences() == [])
+ msg.add_sequence('unseen')
+ self.assert_(msg.get_sequences() == ['unseen'])
+ msg.add_sequence('flagged')
+ self.assert_(msg.get_sequences() == ['unseen', 'flagged'])
+ msg.add_sequence('flagged')
+ self.assert_(msg.get_sequences() == ['unseen', 'flagged'])
+ msg.remove_sequence('unseen')
+ self.assert_(msg.get_sequences() == ['flagged'])
+ msg.add_sequence('foobar')
+ self.assert_(msg.get_sequences() == ['flagged', 'foobar'])
+ msg.remove_sequence('replied')
+ self.assert_(msg.get_sequences() == ['flagged', 'foobar'])
+ msg.set_sequences(['foobar', 'replied'])
+ self.assert_(msg.get_sequences() == ['foobar', 'replied'])
+
+
+class TestBabylMessage(TestMessage):
+
+ _factory = mailbox.BabylMessage
+
+ def _post_initialize_hook(self, msg):
+ self.assert_(msg._labels == [])
+
+ def test_labels(self):
+ # Get, set, join, and leave labels
+ msg = mailbox.BabylMessage(_sample_message)
+ self.assert_(msg.get_labels() == [])
+ msg.set_labels(['foobar'])
+ self.assert_(msg.get_labels() == ['foobar'])
+ msg.set_labels([])
+ self.assert_(msg.get_labels() == [])
+ msg.add_label('filed')
+ self.assert_(msg.get_labels() == ['filed'])
+ msg.add_label('resent')
+ self.assert_(msg.get_labels() == ['filed', 'resent'])
+ msg.add_label('resent')
+ self.assert_(msg.get_labels() == ['filed', 'resent'])
+ msg.remove_label('filed')
+ self.assert_(msg.get_labels() == ['resent'])
+ msg.add_label('foobar')
+ self.assert_(msg.get_labels() == ['resent', 'foobar'])
+ msg.remove_label('unseen')
+ self.assert_(msg.get_labels() == ['resent', 'foobar'])
+ msg.set_labels(['foobar', 'answered'])
+ self.assert_(msg.get_labels() == ['foobar', 'answered'])
+
+ def test_visible(self):
+ # Get, set, and update visible headers
+ msg = mailbox.BabylMessage(_sample_message)
+ visible = msg.get_visible()
+ self.assert_(visible.keys() == [])
+ self.assert_(visible.get_payload() is None)
+ visible['User-Agent'] = 'FooBar 1.0'
+ visible['X-Whatever'] = 'Blah'
+ self.assert_(msg.get_visible().keys() == [])
+ msg.set_visible(visible)
+ visible = msg.get_visible()
+ self.assert_(visible.keys() == ['User-Agent', 'X-Whatever'])
+ self.assert_(visible['User-Agent'] == 'FooBar 1.0')
+ self.assert_(visible['X-Whatever'] == 'Blah')
+ self.assert_(visible.get_payload() is None)
+ msg.update_visible()
+ self.assert_(visible.keys() == ['User-Agent', 'X-Whatever'])
+ self.assert_(visible.get_payload() is None)
+ visible = msg.get_visible()
+ self.assert_(visible.keys() == ['User-Agent', 'Date', 'From', 'To',
+ 'Subject'])
+ for header in ('User-Agent', 'Date', 'From', 'To', 'Subject'):
+ self.assert_(visible[header] == msg[header])
+
+
+class TestMMDFMessage(_TestMboxMMDFMessage):
+
+ _factory = mailbox.MMDFMessage
+
+
+class TestMessageConversion(TestBase):
+
+ def test_plain_to_x(self):
+ # Convert Message to all formats
+ for class_ in (mailbox.Message, mailbox.MaildirMessage,
+ mailbox.mboxMessage, mailbox.MHMessage,
+ mailbox.BabylMessage, mailbox.MMDFMessage):
+ msg_plain = mailbox.Message(_sample_message)
+ msg = class_(msg_plain)
+ self._check_sample(msg)
+
+ def test_x_to_plain(self):
+ # Convert all formats to Message
+ for class_ in (mailbox.Message, mailbox.MaildirMessage,
+ mailbox.mboxMessage, mailbox.MHMessage,
+ mailbox.BabylMessage, mailbox.MMDFMessage):
+ msg = class_(_sample_message)
+ msg_plain = mailbox.Message(msg)
+ self._check_sample(msg_plain)
+
+ def test_x_to_invalid(self):
+ # Convert all formats to an invalid format
+ for class_ in (mailbox.Message, mailbox.MaildirMessage,
+ mailbox.mboxMessage, mailbox.MHMessage,
+ mailbox.BabylMessage, mailbox.MMDFMessage):
+ self.assertRaises(TypeError, lambda: class_(False))
+
+ def test_maildir_to_maildir(self):
+ # Convert MaildirMessage to MaildirMessage
+ msg_maildir = mailbox.MaildirMessage(_sample_message)
+ msg_maildir.set_flags('DFPRST')
+ msg_maildir.set_subdir('cur')
+ date = msg_maildir.get_date()
+ msg = mailbox.MaildirMessage(msg_maildir)
+ self._check_sample(msg)
+ self.assert_(msg.get_flags() == 'DFPRST')
+ self.assert_(msg.get_subdir() == 'cur')
+ self.assert_(msg.get_date() == date)
+
+ def test_maildir_to_mboxmmdf(self):
+ # Convert MaildirMessage to mboxmessage and MMDFMessage
+ pairs = (('D', ''), ('F', 'F'), ('P', ''), ('R', 'A'), ('S', 'R'),
+ ('T', 'D'), ('DFPRST', 'RDFA'))
+ for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage):
+ msg_maildir = mailbox.MaildirMessage(_sample_message)
+ msg_maildir.set_date(0.0)
+ for setting, result in pairs:
+ msg_maildir.set_flags(setting)
+ msg = class_(msg_maildir)
+ self.assert_(msg.get_flags() == result)
+ self.assert_(msg.get_from() == 'MAILER-DAEMON %s' %
+ time.asctime(time.gmtime(0.0)))
+ msg_maildir.set_subdir('cur')
+ self.assert_(class_(msg_maildir).get_flags() == 'RODFA')
+
+ def test_maildir_to_mh(self):
+ # Convert MaildirMessage to MHMessage
+ msg_maildir = mailbox.MaildirMessage(_sample_message)
+ pairs = (('D', ['unseen']), ('F', ['unseen', 'flagged']),
+ ('P', ['unseen']), ('R', ['unseen', 'replied']), ('S', []),
+ ('T', ['unseen']), ('DFPRST', ['replied', 'flagged']))
+ for setting, result in pairs:
+ msg_maildir.set_flags(setting)
+ self.assert_(mailbox.MHMessage(msg_maildir).get_sequences() == \
+ result)
+
+ def test_maildir_to_babyl(self):
+ # Convert MaildirMessage to Babyl
+ msg_maildir = mailbox.MaildirMessage(_sample_message)
+ pairs = (('D', ['unseen']), ('F', ['unseen']),
+ ('P', ['unseen', 'forwarded']), ('R', ['unseen', 'answered']),
+ ('S', []), ('T', ['unseen', 'deleted']),
+ ('DFPRST', ['deleted', 'answered', 'forwarded']))
+ for setting, result in pairs:
+ msg_maildir.set_flags(setting)
+ self.assert_(mailbox.BabylMessage(msg_maildir).get_labels() == \
+ result)
+
+ def test_mboxmmdf_to_maildir(self):
+ # Convert mboxMessage and MMDFMessage to MaildirMessage
+ for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage):
+ msg_mboxMMDF = class_(_sample_message)
+ msg_mboxMMDF.set_from('foo@bar', time.gmtime(0.0))
+ pairs = (('R', 'S'), ('O', ''), ('D', 'T'), ('F', 'F'), ('A', 'R'),
+ ('RODFA', 'FRST'))
+ for setting, result in pairs:
+ msg_mboxMMDF.set_flags(setting)
+ msg = mailbox.MaildirMessage(msg_mboxMMDF)
+ self.assert_(msg.get_flags() == result)
+ self.assert_(msg.get_date() == 0.0, msg.get_date())
+ msg_mboxMMDF.set_flags('O')
+ self.assert_(mailbox.MaildirMessage(msg_mboxMMDF).get_subdir() == \
+ 'cur')
+
+ def test_mboxmmdf_to_mboxmmdf(self):
+ # Convert mboxMessage and MMDFMessage to mboxMessage and MMDFMessage
+ for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage):
+ msg_mboxMMDF = class_(_sample_message)
+ msg_mboxMMDF.set_flags('RODFA')
+ msg_mboxMMDF.set_from('foo@bar')
+ for class2_ in (mailbox.mboxMessage, mailbox.MMDFMessage):
+ msg2 = class2_(msg_mboxMMDF)
+ self.assert_(msg2.get_flags() == 'RODFA')
+ self.assert_(msg2.get_from() == 'foo@bar')
+
+ def test_mboxmmdf_to_mh(self):
+ # Convert mboxMessage and MMDFMessage to MHMessage
+ for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage):
+ msg_mboxMMDF = class_(_sample_message)
+ pairs = (('R', []), ('O', ['unseen']), ('D', ['unseen']),
+ ('F', ['unseen', 'flagged']),
+ ('A', ['unseen', 'replied']),
+ ('RODFA', ['replied', 'flagged']))
+ for setting, result in pairs:
+ msg_mboxMMDF.set_flags(setting)
+ self.assert_(mailbox.MHMessage(msg_mboxMMDF).get_sequences() \
+ == result)
+
+ def test_mboxmmdf_to_babyl(self):
+ # Convert mboxMessage and MMDFMessage to BabylMessage
+ for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage):
+ msg = class_(_sample_message)
+ pairs = (('R', []), ('O', ['unseen']),
+ ('D', ['unseen', 'deleted']), ('F', ['unseen']),
+ ('A', ['unseen', 'answered']),
+ ('RODFA', ['deleted', 'answered']))
+ for setting, result in pairs:
+ msg.set_flags(setting)
+ self.assert_(mailbox.BabylMessage(msg).get_labels() == result)
+
+ def test_mh_to_maildir(self):
+ # Convert MHMessage to MaildirMessage
+ pairs = (('unseen', ''), ('replied', 'RS'), ('flagged', 'FS'))
+ for setting, result in pairs:
+ msg = mailbox.MHMessage(_sample_message)
+ msg.add_sequence(setting)
+ self.assert_(mailbox.MaildirMessage(msg).get_flags() == result)
+ self.assert_(mailbox.MaildirMessage(msg).get_subdir() == 'cur')
+ msg = mailbox.MHMessage(_sample_message)
+ msg.add_sequence('unseen')
+ msg.add_sequence('replied')
+ msg.add_sequence('flagged')
+ self.assert_(mailbox.MaildirMessage(msg).get_flags() == 'FR')
+ self.assert_(mailbox.MaildirMessage(msg).get_subdir() == 'cur')
+
+ def test_mh_to_mboxmmdf(self):
+ # Convert MHMessage to mboxMessage and MMDFMessage
+ pairs = (('unseen', 'O'), ('replied', 'ROA'), ('flagged', 'ROF'))
+ for setting, result in pairs:
+ msg = mailbox.MHMessage(_sample_message)
+ msg.add_sequence(setting)
+ for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage):
+ self.assert_(class_(msg).get_flags() == result)
+ msg = mailbox.MHMessage(_sample_message)
+ msg.add_sequence('unseen')
+ msg.add_sequence('replied')
+ msg.add_sequence('flagged')
+ for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage):
+ self.assert_(class_(msg).get_flags() == 'OFA')
+
+ def test_mh_to_mh(self):
+ # Convert MHMessage to MHMessage
+ msg = mailbox.MHMessage(_sample_message)
+ msg.add_sequence('unseen')
+ msg.add_sequence('replied')
+ msg.add_sequence('flagged')
+ self.assert_(mailbox.MHMessage(msg).get_sequences() == \
+ ['unseen', 'replied', 'flagged'])
+
+ def test_mh_to_babyl(self):
+ # Convert MHMessage to BabylMessage
+ pairs = (('unseen', ['unseen']), ('replied', ['answered']),
+ ('flagged', []))
+ for setting, result in pairs:
+ msg = mailbox.MHMessage(_sample_message)
+ msg.add_sequence(setting)
+ self.assert_(mailbox.BabylMessage(msg).get_labels() == result)
+ msg = mailbox.MHMessage(_sample_message)
+ msg.add_sequence('unseen')
+ msg.add_sequence('replied')
+ msg.add_sequence('flagged')
+ self.assert_(mailbox.BabylMessage(msg).get_labels() == \
+ ['unseen', 'answered'])
+
+ def test_babyl_to_maildir(self):
+ # Convert BabylMessage to MaildirMessage
+ pairs = (('unseen', ''), ('deleted', 'ST'), ('filed', 'S'),
+ ('answered', 'RS'), ('forwarded', 'PS'), ('edited', 'S'),
+ ('resent', 'PS'))
+ for setting, result in pairs:
+ msg = mailbox.BabylMessage(_sample_message)
+ msg.add_label(setting)
+ self.assert_(mailbox.MaildirMessage(msg).get_flags() == result)
+ self.assert_(mailbox.MaildirMessage(msg).get_subdir() == 'cur')
+ msg = mailbox.BabylMessage(_sample_message)
+ for label in ('unseen', 'deleted', 'filed', 'answered', 'forwarded',
+ 'edited', 'resent'):
+ msg.add_label(label)
+ self.assert_(mailbox.MaildirMessage(msg).get_flags() == 'PRT')
+ self.assert_(mailbox.MaildirMessage(msg).get_subdir() == 'cur')
+
+ def test_babyl_to_mboxmmdf(self):
+ # Convert BabylMessage to mboxMessage and MMDFMessage
+ pairs = (('unseen', 'O'), ('deleted', 'ROD'), ('filed', 'RO'),
+ ('answered', 'ROA'), ('forwarded', 'RO'), ('edited', 'RO'),
+ ('resent', 'RO'))
+ for setting, result in pairs:
+ for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage):
+ msg = mailbox.BabylMessage(_sample_message)
+ msg.add_label(setting)
+ self.assert_(class_(msg).get_flags() == result)
+ msg = mailbox.BabylMessage(_sample_message)
+ for label in ('unseen', 'deleted', 'filed', 'answered', 'forwarded',
+ 'edited', 'resent'):
+ msg.add_label(label)
+ for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage):
+ self.assert_(class_(msg).get_flags() == 'ODA')
+
+ def test_babyl_to_mh(self):
+ # Convert BabylMessage to MHMessage
+ pairs = (('unseen', ['unseen']), ('deleted', []), ('filed', []),
+ ('answered', ['replied']), ('forwarded', []), ('edited', []),
+ ('resent', []))
+ for setting, result in pairs:
+ msg = mailbox.BabylMessage(_sample_message)
+ msg.add_label(setting)
+ self.assert_(mailbox.MHMessage(msg).get_sequences() == result)
+ msg = mailbox.BabylMessage(_sample_message)
+ for label in ('unseen', 'deleted', 'filed', 'answered', 'forwarded',
+ 'edited', 'resent'):
+ msg.add_label(label)
+ self.assert_(mailbox.MHMessage(msg).get_sequences() == \
+ ['unseen', 'replied'])
+
+ def test_babyl_to_babyl(self):
+ # Convert BabylMessage to BabylMessage
+ msg = mailbox.BabylMessage(_sample_message)
+ msg.update_visible()
+ for label in ('unseen', 'deleted', 'filed', 'answered', 'forwarded',
+ 'edited', 'resent'):
+ msg.add_label(label)
+ msg2 = mailbox.BabylMessage(msg)
+ self.assert_(msg2.get_labels() == ['unseen', 'deleted', 'filed',
+ 'answered', 'forwarded', 'edited',
+ 'resent'])
+ self.assert_(msg.get_visible().keys() == msg2.get_visible().keys())
+ for key in msg.get_visible().keys():
+ self.assert_(msg.get_visible()[key] == msg2.get_visible()[key])
+
+
+class TestProxyFileBase(TestBase):
+
+ def _test_read(self, proxy):
+ # Read by byte
+ proxy.seek(0)
+ self.assert_(proxy.read() == 'bar')
+ proxy.seek(1)
+ self.assert_(proxy.read() == 'ar')
+ proxy.seek(0)
+ self.assert_(proxy.read(2) == 'ba')
+ proxy.seek(1)
+ self.assert_(proxy.read(-1) == 'ar')
+ proxy.seek(2)
+ self.assert_(proxy.read(1000) == 'r')
+
+ def _test_readline(self, proxy):
+ # Read by line
+ proxy.seek(0)
+ self.assert_(proxy.readline() == 'foo' + os.linesep)
+ self.assert_(proxy.readline() == 'bar' + os.linesep)
+ self.assert_(proxy.readline() == 'fred' + os.linesep)
+ self.assert_(proxy.readline() == 'bob')
+ proxy.seek(2)
+ self.assert_(proxy.readline() == 'o' + os.linesep)
+ proxy.seek(6 + 2 * len(os.linesep))
+ self.assert_(proxy.readline() == 'fred' + os.linesep)
+ proxy.seek(6 + 2 * len(os.linesep))
+ self.assert_(proxy.readline(2) == 'fr')
+ self.assert_(proxy.readline(-10) == 'ed' + os.linesep)
+
+ def _test_readlines(self, proxy):
+ # Read multiple lines
+ proxy.seek(0)
+ self.assert_(proxy.readlines() == ['foo' + os.linesep,
+ 'bar' + os.linesep,
+ 'fred' + os.linesep, 'bob'])
+ proxy.seek(0)
+ self.assert_(proxy.readlines(2) == ['foo' + os.linesep])
+ proxy.seek(3 + len(os.linesep))
+ self.assert_(proxy.readlines(4 + len(os.linesep)) ==
+ ['bar' + os.linesep, 'fred' + os.linesep])
+ proxy.seek(3)
+ self.assert_(proxy.readlines(1000) == [os.linesep, 'bar' + os.linesep,
+ 'fred' + os.linesep, 'bob'])
+
+ def _test_iteration(self, proxy):
+ # Iterate by line
+ proxy.seek(0)
+ iterator = iter(proxy)
+ self.assert_(iterator.next() == 'foo' + os.linesep)
+ self.assert_(iterator.next() == 'bar' + os.linesep)
+ self.assert_(iterator.next() == 'fred' + os.linesep)
+ self.assert_(iterator.next() == 'bob')
+ self.assertRaises(StopIteration, lambda: iterator.next())
+
+ def _test_seek_and_tell(self, proxy):
+ # Seek and use tell to check position
+ proxy.seek(3)
+ self.assert_(proxy.tell() == 3)
+ self.assert_(proxy.read(len(os.linesep)) == os.linesep)
+ proxy.seek(2, 1)
+ self.assert_(proxy.read(1 + len(os.linesep)) == 'r' + os.linesep)
+ proxy.seek(-3 - len(os.linesep), 2)
+ self.assert_(proxy.read(3) == 'bar')
+ proxy.seek(2, 0)
+ self.assert_(proxy.read() == 'o' + os.linesep + 'bar' + os.linesep)
+ proxy.seek(100)
+ self.assert_(proxy.read() == '')
+
+ def _test_close(self, proxy):
+ # Close a file
+ proxy.close()
+ self.assertRaises(AttributeError, lambda: proxy.close())
+
+
+class TestProxyFile(TestProxyFileBase):
+
+ def setUp(self):
+ self._path = test_support.TESTFN
+ self._file = open(self._path, 'wb+')
+
+ def tearDown(self):
+ self._file.close()
+ self._delete_recursively(self._path)
+
+ def test_initialize(self):
+ # Initialize and check position
+ self._file.write('foo')
+ pos = self._file.tell()
+ proxy0 = mailbox._ProxyFile(self._file)
+ self.assert_(proxy0.tell() == pos)
+ self.assert_(self._file.tell() == pos)
+ proxy1 = mailbox._ProxyFile(self._file, 0)
+ self.assert_(proxy1.tell() == 0)
+ self.assert_(self._file.tell() == pos)
+
+ def test_read(self):
+ self._file.write('bar')
+ self._test_read(mailbox._ProxyFile(self._file))
+
+ def test_readline(self):
+ self._file.write('foo%sbar%sfred%sbob' % (os.linesep, os.linesep,
+ os.linesep))
+ self._test_readline(mailbox._ProxyFile(self._file))
+
+ def test_readlines(self):
+ self._file.write('foo%sbar%sfred%sbob' % (os.linesep, os.linesep,
+ os.linesep))
+ self._test_readlines(mailbox._ProxyFile(self._file))
+
+ def test_iteration(self):
+ self._file.write('foo%sbar%sfred%sbob' % (os.linesep, os.linesep,
+ os.linesep))
+ self._test_iteration(mailbox._ProxyFile(self._file))
+
+ def test_seek_and_tell(self):
+ self._file.write('foo%sbar%s' % (os.linesep, os.linesep))
+ self._test_seek_and_tell(mailbox._ProxyFile(self._file))
+
+ def test_close(self):
+ self._file.write('foo%sbar%s' % (os.linesep, os.linesep))
+ self._test_close(mailbox._ProxyFile(self._file))
+
+
+class TestPartialFile(TestProxyFileBase):
+
+ def setUp(self):
+ self._path = test_support.TESTFN
+ self._file = open(self._path, 'wb+')
+
+ def tearDown(self):
+ self._file.close()
+ self._delete_recursively(self._path)
+
+ def test_initialize(self):
+ # Initialize and check position
+ self._file.write('foo' + os.linesep + 'bar')
+ pos = self._file.tell()
+ proxy = mailbox._PartialFile(self._file, 2, 5)
+ self.assert_(proxy.tell() == 0)
+ self.assert_(self._file.tell() == pos)
+
+ def test_read(self):
+ self._file.write('***bar***')
+ self._test_read(mailbox._PartialFile(self._file, 3, 6))
+
+ def test_readline(self):
+ self._file.write('!!!!!foo%sbar%sfred%sbob!!!!!' %
+ (os.linesep, os.linesep, os.linesep))
+ self._test_readline(mailbox._PartialFile(self._file, 5,
+ 18 + 3 * len(os.linesep)))
+
+ def test_readlines(self):
+ self._file.write('foo%sbar%sfred%sbob?????' %
+ (os.linesep, os.linesep, os.linesep))
+ self._test_readlines(mailbox._PartialFile(self._file, 0,
+ 13 + 3 * len(os.linesep)))
+
+ def test_iteration(self):
+ self._file.write('____foo%sbar%sfred%sbob####' %
+ (os.linesep, os.linesep, os.linesep))
+ self._test_iteration(mailbox._PartialFile(self._file, 4,
+ 17 + 3 * len(os.linesep)))
+
+ def test_seek_and_tell(self):
+ self._file.write('(((foo%sbar%s$$$' % (os.linesep, os.linesep))
+ self._test_seek_and_tell(mailbox._PartialFile(self._file, 3,
+ 9 + 2 * len(os.linesep)))
+
+ def test_close(self):
+ self._file.write('&foo%sbar%s^' % (os.linesep, os.linesep))
+ self._test_close(mailbox._PartialFile(self._file, 1,
+ 6 + 3 * len(os.linesep)))
+
+
+## Start: tests from the original module (for backward compatibility).
+
FROM_ = "From some.body@dummy.domain Sat Jul 24 13:43:35 2004\n"
DUMMY_MESSAGE = """\
From: some.body@dummy.domain
@@ -65,15 +1626,15 @@ class MaildirTestCase(unittest.TestCase):
# Test for regression on bug #117490:
# Make sure the boxes attribute actually gets set.
self.mbox = mailbox.Maildir(test_support.TESTFN)
- self.assert_(hasattr(self.mbox, "boxes"))
- self.assert_(len(self.mbox.boxes) == 0)
+ #self.assert_(hasattr(self.mbox, "boxes"))
+ #self.assert_(len(self.mbox.boxes) == 0)
self.assert_(self.mbox.next() is None)
self.assert_(self.mbox.next() is None)
def test_nonempty_maildir_cur(self):
self.createMessage("cur")
self.mbox = mailbox.Maildir(test_support.TESTFN)
- self.assert_(len(self.mbox.boxes) == 1)
+ #self.assert_(len(self.mbox.boxes) == 1)
self.assert_(self.mbox.next() is not None)
self.assert_(self.mbox.next() is None)
self.assert_(self.mbox.next() is None)
@@ -81,7 +1642,7 @@ class MaildirTestCase(unittest.TestCase):
def test_nonempty_maildir_new(self):
self.createMessage("new")
self.mbox = mailbox.Maildir(test_support.TESTFN)
- self.assert_(len(self.mbox.boxes) == 1)
+ #self.assert_(len(self.mbox.boxes) == 1)
self.assert_(self.mbox.next() is not None)
self.assert_(self.mbox.next() is None)
self.assert_(self.mbox.next() is None)
@@ -90,7 +1651,7 @@ class MaildirTestCase(unittest.TestCase):
self.createMessage("cur")
self.createMessage("new")
self.mbox = mailbox.Maildir(test_support.TESTFN)
- self.assert_(len(self.mbox.boxes) == 2)
+ #self.assert_(len(self.mbox.boxes) == 2)
self.assert_(self.mbox.next() is not None)
self.assert_(self.mbox.next() is not None)
self.assert_(self.mbox.next() is None)
@@ -108,12 +1669,99 @@ class MaildirTestCase(unittest.TestCase):
self.assertEqual(len(str(msg)), len(FROM_)+len(DUMMY_MESSAGE))
self.assertEqual(n, 1)
- # XXX We still need more tests!
+## End: classes from the original module (for backward compatibility).
+
+
+_sample_message = """\
+Return-Path: <gkj@gregorykjohnson.com>
+X-Original-To: gkj+person@localhost
+Delivered-To: gkj+person@localhost
+Received: from localhost (localhost [127.0.0.1])
+ by andy.gregorykjohnson.com (Postfix) with ESMTP id 356ED9DD17
+ for <gkj+person@localhost>; Wed, 13 Jul 2005 17:23:16 -0400 (EDT)
+Delivered-To: gkj@sundance.gregorykjohnson.com
+Received: from localhost [127.0.0.1]
+ by localhost with POP3 (fetchmail-6.2.5)
+ for gkj+person@localhost (single-drop); Wed, 13 Jul 2005 17:23:16 -0400 (EDT)
+Received: from andy.gregorykjohnson.com (andy.gregorykjohnson.com [64.32.235.228])
+ by sundance.gregorykjohnson.com (Postfix) with ESMTP id 5B056316746
+ for <gkj@gregorykjohnson.com>; Wed, 13 Jul 2005 17:23:11 -0400 (EDT)
+Received: by andy.gregorykjohnson.com (Postfix, from userid 1000)
+ id 490CD9DD17; Wed, 13 Jul 2005 17:23:11 -0400 (EDT)
+Date: Wed, 13 Jul 2005 17:23:11 -0400
+From: "Gregory K. Johnson" <gkj@gregorykjohnson.com>
+To: gkj@gregorykjohnson.com
+Subject: Sample message
+Message-ID: <20050713212311.GC4701@andy.gregorykjohnson.com>
+Mime-Version: 1.0
+Content-Type: multipart/mixed; boundary="NMuMz9nt05w80d4+"
+Content-Disposition: inline
+User-Agent: Mutt/1.5.9i
+
+
+--NMuMz9nt05w80d4+
+Content-Type: text/plain; charset=us-ascii
+Content-Disposition: inline
+
+This is a sample message.
+
+--
+Gregory K. Johnson
+
+--NMuMz9nt05w80d4+
+Content-Type: application/octet-stream
+Content-Disposition: attachment; filename="text.gz"
+Content-Transfer-Encoding: base64
+
+H4sICM2D1UIAA3RleHQAC8nILFYAokSFktSKEoW0zJxUPa7wzJIMhZLyfIWczLzUYj0uAHTs
+3FYlAAAA
+
+--NMuMz9nt05w80d4+--
+"""
+
+_sample_headers = {
+ "Return-Path":"<gkj@gregorykjohnson.com>",
+ "X-Original-To":"gkj+person@localhost",
+ "Delivered-To":"gkj+person@localhost",
+ "Received":"""from localhost (localhost [127.0.0.1])
+ by andy.gregorykjohnson.com (Postfix) with ESMTP id 356ED9DD17
+ for <gkj+person@localhost>; Wed, 13 Jul 2005 17:23:16 -0400 (EDT)""",
+ "Delivered-To":"gkj@sundance.gregorykjohnson.com",
+ "Received":"""from localhost [127.0.0.1]
+ by localhost with POP3 (fetchmail-6.2.5)
+ for gkj+person@localhost (single-drop); Wed, 13 Jul 2005 17:23:16 -0400 (EDT)""",
+ "Received":"""from andy.gregorykjohnson.com (andy.gregorykjohnson.com [64.32.235.228])
+ by sundance.gregorykjohnson.com (Postfix) with ESMTP id 5B056316746
+ for <gkj@gregorykjohnson.com>; Wed, 13 Jul 2005 17:23:11 -0400 (EDT)""",
+ "Received":"""by andy.gregorykjohnson.com (Postfix, from userid 1000)
+ id 490CD9DD17; Wed, 13 Jul 2005 17:23:11 -0400 (EDT)""",
+ "Date":"Wed, 13 Jul 2005 17:23:11 -0400",
+ "From":""""Gregory K. Johnson" <gkj@gregorykjohnson.com>""",
+ "To":"gkj@gregorykjohnson.com",
+ "Subject":"Sample message",
+ "Mime-Version":"1.0",
+ "Content-Type":"""multipart/mixed; boundary="NMuMz9nt05w80d4+\"""",
+ "Content-Disposition":"inline",
+ "User-Agent": "Mutt/1.5.9i" }
+
+_sample_payloads = ("""This is a sample message.
+
+--
+Gregory K. Johnson
+""",
+"""H4sICM2D1UIAA3RleHQAC8nILFYAokSFktSKEoW0zJxUPa7wzJIMhZLyfIWczLzUYj0uAHTs
+3FYlAAAA
+""")
def test_main():
- test_support.run_unittest(MaildirTestCase)
+ tests = (TestMailboxSuperclass, TestMaildir, TestMbox, TestMMDF, TestMH,
+ TestBabyl, TestMessage, TestMaildirMessage, TestMboxMessage,
+ TestMHMessage, TestBabylMessage, TestMMDFMessage,
+ TestMessageConversion, TestProxyFile, TestPartialFile,
+ MaildirTestCase)
+ test_support.run_unittest(*tests)
-if __name__ == "__main__":
+if __name__ == '__main__':
test_main()
diff --git a/Lib/test/test_multibytecodec.py b/Lib/test/test_multibytecodec.py
index 4d02dee..276b9af 100644
--- a/Lib/test/test_multibytecodec.py
+++ b/Lib/test/test_multibytecodec.py
@@ -3,7 +3,6 @@
# test_multibytecodec.py
# Unit test for multibytecodec itself
#
-# $CJKCodecs: test_multibytecodec.py,v 1.8 2004/06/19 06:09:55 perky Exp $
from test import test_support
from test import test_multibytecodec_support
diff --git a/Lib/test/test_old_mailbox.py b/Lib/test/test_old_mailbox.py
new file mode 100644
index 0000000..cca6897
--- /dev/null
+++ b/Lib/test/test_old_mailbox.py
@@ -0,0 +1,120 @@
+# This set of tests exercises the backward-compatibility class
+# in mailbox.py (the ones without write support).
+
+import mailbox
+import os
+import time
+import unittest
+from test import test_support
+
+# cleanup earlier tests
+try:
+ os.unlink(test_support.TESTFN)
+except os.error:
+ pass
+
+FROM_ = "From some.body@dummy.domain Sat Jul 24 13:43:35 2004\n"
+DUMMY_MESSAGE = """\
+From: some.body@dummy.domain
+To: me@my.domain
+Subject: Simple Test
+
+This is a dummy message.
+"""
+
+class MaildirTestCase(unittest.TestCase):
+
+ def setUp(self):
+ # create a new maildir mailbox to work with:
+ self._dir = test_support.TESTFN
+ os.mkdir(self._dir)
+ os.mkdir(os.path.join(self._dir, "cur"))
+ os.mkdir(os.path.join(self._dir, "tmp"))
+ os.mkdir(os.path.join(self._dir, "new"))
+ self._counter = 1
+ self._msgfiles = []
+
+ def tearDown(self):
+ map(os.unlink, self._msgfiles)
+ os.rmdir(os.path.join(self._dir, "cur"))
+ os.rmdir(os.path.join(self._dir, "tmp"))
+ os.rmdir(os.path.join(self._dir, "new"))
+ os.rmdir(self._dir)
+
+ def createMessage(self, dir, mbox=False):
+ t = int(time.time() % 1000000)
+ pid = self._counter
+ self._counter += 1
+ filename = os.extsep.join((str(t), str(pid), "myhostname", "mydomain"))
+ tmpname = os.path.join(self._dir, "tmp", filename)
+ newname = os.path.join(self._dir, dir, filename)
+ fp = open(tmpname, "w")
+ self._msgfiles.append(tmpname)
+ if mbox:
+ fp.write(FROM_)
+ fp.write(DUMMY_MESSAGE)
+ fp.close()
+ if hasattr(os, "link"):
+ os.link(tmpname, newname)
+ else:
+ fp = open(newname, "w")
+ fp.write(DUMMY_MESSAGE)
+ fp.close()
+ self._msgfiles.append(newname)
+ return tmpname
+
+ def test_empty_maildir(self):
+ """Test an empty maildir mailbox"""
+ # Test for regression on bug #117490:
+ self.mbox = mailbox.Maildir(test_support.TESTFN)
+ self.assert_(len(self.mbox) == 0)
+ self.assert_(self.mbox.next() is None)
+ self.assert_(self.mbox.next() is None)
+
+ def test_nonempty_maildir_cur(self):
+ self.createMessage("cur")
+ self.mbox = mailbox.Maildir(test_support.TESTFN)
+ self.assert_(len(self.mbox) == 1)
+ self.assert_(self.mbox.next() is not None)
+ self.assert_(self.mbox.next() is None)
+ self.assert_(self.mbox.next() is None)
+
+ def test_nonempty_maildir_new(self):
+ self.createMessage("new")
+ self.mbox = mailbox.Maildir(test_support.TESTFN)
+ self.assert_(len(self.mbox) == 1)
+ self.assert_(self.mbox.next() is not None)
+ self.assert_(self.mbox.next() is None)
+ self.assert_(self.mbox.next() is None)
+
+ def test_nonempty_maildir_both(self):
+ self.createMessage("cur")
+ self.createMessage("new")
+ self.mbox = mailbox.Maildir(test_support.TESTFN)
+ self.assert_(len(self.mbox) == 2)
+ self.assert_(self.mbox.next() is not None)
+ self.assert_(self.mbox.next() is not None)
+ self.assert_(self.mbox.next() is None)
+ self.assert_(self.mbox.next() is None)
+
+ def test_unix_mbox(self):
+ ### should be better!
+ import email.Parser
+ fname = self.createMessage("cur", True)
+ n = 0
+ for msg in mailbox.PortableUnixMailbox(open(fname),
+ email.Parser.Parser().parse):
+ n += 1
+ self.assertEqual(msg["subject"], "Simple Test")
+ self.assertEqual(len(str(msg)), len(FROM_)+len(DUMMY_MESSAGE))
+ self.assertEqual(n, 1)
+
+ # XXX We still need more tests!
+
+
+def test_main():
+ test_support.run_unittest(MaildirTestCase)
+
+
+if __name__ == "__main__":
+ test_main()
diff --git a/Lib/test/test_optparse.py b/Lib/test/test_optparse.py
index f656b9f..991c06d 100644
--- a/Lib/test/test_optparse.py
+++ b/Lib/test/test_optparse.py
@@ -10,17 +10,22 @@
import sys
import os
+import re
import copy
+import types
import unittest
from cStringIO import StringIO
from pprint import pprint
from test import test_support
+
from optparse import make_option, Option, IndentedHelpFormatter, \
TitledHelpFormatter, OptionParser, OptionContainer, OptionGroup, \
SUPPRESS_HELP, SUPPRESS_USAGE, OptionError, OptionConflictError, \
- BadOptionError, OptionValueError, Values, _match_abbrev
+ BadOptionError, OptionValueError, Values
+from optparse import _match_abbrev
+from optparse import _parse_num
# Do the right thing with boolean values for all known Python versions.
try:
@@ -28,6 +33,7 @@ try:
except NameError:
(True, False) = (1, 0)
+retype = type(re.compile(''))
class InterceptedError(Exception):
def __init__(self,
@@ -96,7 +102,8 @@ Args were %(args)s.""" % locals ())
args -- positional arguments to `func`
kwargs -- keyword arguments to `func`
expected_exception -- exception that should be raised
- expected_output -- output we expect to see
+ expected_message -- expected exception message (or pattern
+ if a compiled regex object)
Returns the exception raised for further testing.
"""
@@ -109,14 +116,23 @@ Args were %(args)s.""" % locals ())
func(*args, **kwargs)
except expected_exception, err:
actual_message = str(err)
- self.assertEqual(actual_message,
- expected_message,
+ if isinstance(expected_message, retype):
+ self.assert_(expected_message.search(actual_message),
"""\
+expected exception message pattern:
+/%s/
+actual exception message:
+'''%s'''
+""" % (expected_message.pattern, actual_message))
+ else:
+ self.assertEqual(actual_message,
+ expected_message,
+ """\
expected exception message:
-'''%(expected_message)s'''
+'''%s'''
actual exception message:
-'''%(actual_message)s'''
-""" % locals())
+'''%s'''
+""" % (expected_message, actual_message))
return err
else:
@@ -157,7 +173,9 @@ and kwargs %(kwargs)r
sys.stdout = save_stdout
except InterceptedError, err:
- self.assertEqual(output, expected_output)
+ if output != expected_output:
+ self.fail("expected: \n'''\n" + expected_output +
+ "'''\nbut got \n'''\n" + output + "'''")
self.assertEqual(err.exit_status, expected_status)
self.assertEqual(err.exit_message, expected_error)
else:
@@ -366,6 +384,23 @@ class TestOptionParser(BaseTest):
self.assertRaises(self.parser.remove_option, ('foo',), None,
ValueError, "no such option 'foo'")
+ def test_refleak(self):
+ # If an OptionParser is carrying around a reference to a large
+ # object, various cycles can prevent it from being GC'd in
+ # a timely fashion. destroy() breaks the cycles to ensure stuff
+ # can be cleaned up.
+ big_thing = [42]
+ refcount = sys.getrefcount(big_thing)
+ parser = OptionParser()
+ parser.add_option("-a", "--aaarggh")
+ parser.big_thing = big_thing
+
+ parser.destroy()
+ #self.assertEqual(refcount, sys.getrefcount(big_thing))
+ del parser
+ self.assertEqual(refcount, sys.getrefcount(big_thing))
+
+
class TestOptionValues(BaseTest):
def setUp(self):
pass
@@ -391,13 +426,21 @@ class TestTypeAliases(BaseTest):
def setUp(self):
self.parser = OptionParser()
- def test_type_aliases(self):
- self.parser.add_option("-x", type=int)
+ def test_str_aliases_string(self):
+ self.parser.add_option("-s", type="str")
+ self.assertEquals(self.parser.get_option("-s").type, "string")
+
+ def test_new_type_object(self):
self.parser.add_option("-s", type=str)
- self.parser.add_option("-t", type="str")
+ self.assertEquals(self.parser.get_option("-s").type, "string")
+ self.parser.add_option("-x", type=int)
self.assertEquals(self.parser.get_option("-x").type, "int")
+
+ def test_old_type_object(self):
+ self.parser.add_option("-s", type=types.StringType)
self.assertEquals(self.parser.get_option("-s").type, "string")
- self.assertEquals(self.parser.get_option("-t").type, "string")
+ self.parser.add_option("-x", type=types.IntType)
+ self.assertEquals(self.parser.get_option("-x").type, "int")
# Custom type for testing processing of default values.
@@ -487,13 +530,13 @@ class TestProgName(BaseTest):
save_argv = sys.argv[:]
try:
sys.argv[0] = os.path.join("foo", "bar", "baz.py")
- parser = OptionParser("usage: %prog ...", version="%prog 1.2")
- expected_usage = "usage: baz.py ...\n"
+ parser = OptionParser("%prog ...", version="%prog 1.2")
+ expected_usage = "Usage: baz.py ...\n"
self.assertUsage(parser, expected_usage)
self.assertVersion(parser, "baz.py 1.2")
self.assertHelp(parser,
expected_usage + "\n" +
- "options:\n"
+ "Options:\n"
" --version show program's version number and exit\n"
" -h, --help show this help message and exit\n")
finally:
@@ -505,7 +548,7 @@ class TestProgName(BaseTest):
usage="%prog arg arg")
parser.remove_option("-h")
parser.remove_option("--version")
- expected_usage = "usage: thingy arg arg\n"
+ expected_usage = "Usage: thingy arg arg\n"
self.assertUsage(parser, expected_usage)
self.assertVersion(parser, "thingy 0.1")
self.assertHelp(parser, expected_usage + "\n")
@@ -515,9 +558,9 @@ class TestExpandDefaults(BaseTest):
def setUp(self):
self.parser = OptionParser(prog="test")
self.help_prefix = """\
-usage: test [options]
+Usage: test [options]
-options:
+Options:
-h, --help show this help message and exit
"""
self.file_help = "read from FILE [default: %default]"
@@ -699,13 +742,16 @@ class TestStandard(BaseTest):
self.assertParseOK(["-a", "--", "foo", "bar"],
{'a': "--", 'boo': None, 'foo': None},
["foo", "bar"]),
+ self.assertParseOK(["-a", "--", "--foo", "bar"],
+ {'a': "--", 'boo': None, 'foo': ["bar"]},
+ []),
def test_short_option_joined_and_separator(self):
self.assertParseOK(["-ab", "--", "--foo", "bar"],
{'a': "b", 'boo': None, 'foo': None},
["--foo", "bar"]),
- def test_invalid_option_becomes_positional_arg(self):
+ def test_hyphen_becomes_positional_arg(self):
self.assertParseOK(["-ab", "-", "--foo", "bar"],
{'a': "b", 'boo': None, 'foo': ["bar"]},
["-"])
@@ -870,6 +916,8 @@ class TestMultipleArgsAppend(BaseTest):
type="float", dest="point")
self.parser.add_option("-f", "--foo", action="append", nargs=2,
type="int", dest="foo")
+ self.parser.add_option("-z", "--zero", action="append_const",
+ dest="foo", const=(0, 0))
def test_nargs_append(self):
self.assertParseOK(["-f", "4", "-3", "blah", "--foo", "1", "666"],
@@ -885,6 +933,11 @@ class TestMultipleArgsAppend(BaseTest):
{'point': None, 'foo':[(3, 4)]},
[])
+ def test_nargs_append_const(self):
+ self.assertParseOK(["--zero", "--foo", "3", "4", "-z"],
+ {'point': None, 'foo':[(0, 0), (3, 4), (0, 0)]},
+ [])
+
class TestVersion(BaseTest):
def test_version(self):
self.parser = InterceptingOptionParser(usage=SUPPRESS_USAGE,
@@ -960,8 +1013,14 @@ class TestExtendAddTypes(BaseTest):
self.parser.add_option("-a", None, type="string", dest="a")
self.parser.add_option("-f", "--file", type="file", dest="file")
+ def tearDown(self):
+ if os.path.isdir(test_support.TESTFN):
+ os.rmdir(test_support.TESTFN)
+ elif os.path.isfile(test_support.TESTFN):
+ os.unlink(test_support.TESTFN)
+
class MyOption (Option):
- def check_file (option, opt, value):
+ def check_file(option, opt, value):
if not os.path.exists(value):
raise OptionValueError("%s: file does not exist" % value)
elif not os.path.isfile(value):
@@ -972,25 +1031,23 @@ class TestExtendAddTypes(BaseTest):
TYPE_CHECKER = copy.copy(Option.TYPE_CHECKER)
TYPE_CHECKER["file"] = check_file
- def test_extend_file(self):
+ def test_filetype_ok(self):
open(test_support.TESTFN, "w").close()
self.assertParseOK(["--file", test_support.TESTFN, "-afoo"],
{'file': test_support.TESTFN, 'a': 'foo'},
[])
- os.unlink(test_support.TESTFN)
-
- def test_extend_file_nonexistent(self):
+ def test_filetype_noexist(self):
self.assertParseFail(["--file", test_support.TESTFN, "-afoo"],
"%s: file does not exist" %
test_support.TESTFN)
- def test_file_irregular(self):
+ def test_filetype_notfile(self):
os.mkdir(test_support.TESTFN)
self.assertParseFail(["--file", test_support.TESTFN, "-afoo"],
"%s: not a regular file" %
test_support.TESTFN)
- os.rmdir(test_support.TESTFN)
+
class TestExtendAddActions(BaseTest):
def setUp(self):
@@ -1003,7 +1060,7 @@ class TestExtendAddActions(BaseTest):
STORE_ACTIONS = Option.STORE_ACTIONS + ("extend",)
TYPED_ACTIONS = Option.TYPED_ACTIONS + ("extend",)
- def take_action (self, action, dest, opt, value, values, parser):
+ def take_action(self, action, dest, opt, value, values, parser):
if action == "extend":
lvalue = value.split(",")
values.ensure_value(dest, []).extend(lvalue)
@@ -1072,7 +1129,7 @@ class TestCallback(BaseTest):
callback=lambda: None, type="string",
help="foo")
- expected_help = ("options:\n"
+ expected_help = ("Options:\n"
" -t TEST, --test=TEST foo\n")
self.assertHelp(parser, expected_help)
@@ -1085,7 +1142,7 @@ class TestCallbackExtraArgs(BaseTest):
dest="points", default=[])]
self.parser = OptionParser(option_list=options)
- def process_tuple (self, option, opt, value, parser_, len, type):
+ def process_tuple(self, option, opt, value, parser_, len, type):
self.assertEqual(len, 3)
self.assert_(type is int)
@@ -1110,7 +1167,7 @@ class TestCallbackMeddleArgs(BaseTest):
self.parser = OptionParser(option_list=options)
# Callback that meddles in rargs, largs
- def process_n (self, option, opt, value, parser_):
+ def process_n(self, option, opt, value, parser_):
# option is -3, -5, etc.
nargs = int(opt[1:])
rargs = parser_.rargs
@@ -1139,7 +1196,7 @@ class TestCallbackManyArgs(BaseTest):
callback=self.process_many, type="int")]
self.parser = OptionParser(option_list=options)
- def process_many (self, option, opt, value, parser_):
+ def process_many(self, option, opt, value, parser_):
if opt == "-a":
self.assertEqual(value, ("foo", "bar"))
elif opt == "--apple":
@@ -1162,7 +1219,7 @@ class TestCallbackCheckAbbrev(BaseTest):
self.parser.add_option("--foo-bar", action="callback",
callback=self.check_abbrev)
- def check_abbrev (self, option, opt, value, parser):
+ def check_abbrev(self, option, opt, value, parser):
self.assertEqual(opt, "--foo-bar")
def test_abbrev_callback_expansion(self):
@@ -1177,7 +1234,7 @@ class TestCallbackVarArgs(BaseTest):
self.parser = InterceptingOptionParser(usage=SUPPRESS_USAGE,
option_list=options)
- def variable_args (self, option, opt, value, parser):
+ def variable_args(self, option, opt, value, parser):
self.assert_(value is None)
done = 0
value = []
@@ -1229,7 +1286,7 @@ class ConflictBase(BaseTest):
self.parser = InterceptingOptionParser(usage=SUPPRESS_USAGE,
option_list=options)
- def show_version (self, option, opt, value, parser):
+ def show_version(self, option, opt, value, parser):
parser.values.show_version = 1
class TestConflict(ConflictBase):
@@ -1280,7 +1337,7 @@ class TestConflictResolve(ConflictBase):
def test_conflict_resolve_help(self):
self.assertOutput(["-h"], """\
-options:
+Options:
--verbose increment verbosity
-h, --help show this help message and exit
-v, --version show version
@@ -1319,7 +1376,7 @@ class TestConflictOverride(BaseTest):
def test_conflict_override_help(self):
self.assertOutput(["-h"], """\
-options:
+Options:
-h, --help show this help message and exit
-n, --dry-run dry run mode
""")
@@ -1332,9 +1389,9 @@ options:
# -- Other testing. ----------------------------------------------------
_expected_help_basic = """\
-usage: bar.py [options]
+Usage: bar.py [options]
-options:
+Options:
-a APPLE throw APPLEs at basket
-b NUM, --boo=NUM shout "boo!" NUM times (in order to frighten away all the
evil spirits that cause trouble and mayhem)
@@ -1343,9 +1400,9 @@ options:
"""
_expected_help_long_opts_first = """\
-usage: bar.py [options]
+Usage: bar.py [options]
-options:
+Options:
-a APPLE throw APPLEs at basket
--boo=NUM, -b NUM shout "boo!" NUM times (in order to frighten away all the
evil spirits that cause trouble and mayhem)
@@ -1358,7 +1415,7 @@ Usage
=====
bar.py [options]
-options
+Options
=======
-a APPLE throw APPLEs at basket
--boo=NUM, -b NUM shout "boo!" NUM times (in order to frighten away all the
@@ -1368,9 +1425,9 @@ options
"""
_expected_help_short_lines = """\
-usage: bar.py [options]
+Usage: bar.py [options]
-options:
+Options:
-a APPLE throw APPLEs at basket
-b NUM, --boo=NUM shout "boo!" NUM times (in order to
frighten away all the evil spirits
@@ -1382,15 +1439,8 @@ options:
class TestHelp(BaseTest):
def setUp(self):
- self.orig_columns = os.environ.get('COLUMNS')
self.parser = self.make_parser(80)
- def tearDown(self):
- if self.orig_columns is None:
- del os.environ['COLUMNS']
- else:
- os.environ['COLUMNS'] = self.orig_columns
-
def make_parser(self, columns):
options = [
make_option("-a", type="string", dest='a',
@@ -1419,7 +1469,7 @@ class TestHelp(BaseTest):
self.assertHelpEquals(_expected_help_basic)
def test_help_old_usage(self):
- self.parser.set_usage("usage: %prog [options]")
+ self.parser.set_usage("Usage: %prog [options]")
self.assertHelpEquals(_expected_help_basic)
def test_help_long_opts_first(self):
@@ -1449,13 +1499,13 @@ class TestHelp(BaseTest):
group.add_option("-g", action="store_true", help="Group option.")
self.parser.add_option_group(group)
- self.assertHelpEquals("""\
-usage: bar.py [options]
+ expect = """\
+Usage: bar.py [options]
This is the program description for bar.py. bar.py has an option group as
well as single options.
-options:
+Options:
-a APPLE throw APPLEs at basket
-b NUM, --boo=NUM shout "boo!" NUM times (in order to frighten away all the
evil spirits that cause trouble and mayhem)
@@ -1467,9 +1517,12 @@ options:
that some of them bite.
-g Group option.
-""")
+"""
+ self.assertHelpEquals(expect)
+ self.parser.epilog = "Please report bugs to /dev/null."
+ self.assertHelpEquals(expect + "\nPlease report bugs to /dev/null.\n")
class TestMatchAbbrev(BaseTest):
@@ -1490,6 +1543,43 @@ class TestMatchAbbrev(BaseTest):
BadOptionError, "ambiguous option: --f (%s?)" % possibilities)
+class TestParseNumber(BaseTest):
+ def setUp(self):
+ self.parser = InterceptingOptionParser()
+ self.parser.add_option("-n", type=int)
+ self.parser.add_option("-l", type=long)
+
+ def test_parse_num_fail(self):
+ self.assertRaises(
+ _parse_num, ("", int), {},
+ ValueError,
+ re.compile(r"invalid literal for int().*: '?'?"))
+ self.assertRaises(
+ _parse_num, ("0xOoops", long), {},
+ ValueError,
+ re.compile(r"invalid literal for long().*: '?0xOoops'?"))
+
+ def test_parse_num_ok(self):
+ self.assertEqual(_parse_num("0", int), 0)
+ self.assertEqual(_parse_num("0x10", int), 16)
+ self.assertEqual(_parse_num("0XA", long), 10L)
+ self.assertEqual(_parse_num("010", long), 8L)
+ self.assertEqual(_parse_num("0b11", int), 3)
+ self.assertEqual(_parse_num("0b", long), 0L)
+
+ def test_numeric_options(self):
+ self.assertParseOK(["-n", "42", "-l", "0x20"],
+ { "n": 42, "l": 0x20 }, [])
+ self.assertParseOK(["-n", "0b0101", "-l010"],
+ { "n": 5, "l": 8 }, [])
+ self.assertParseFail(["-n008"],
+ "option -n: invalid integer value: '008'")
+ self.assertParseFail(["-l0b0123"],
+ "option -l: invalid long integer value: '0b0123'")
+ self.assertParseFail(["-l", "0x12x"],
+ "option -l: invalid long integer value: '0x12x'")
+
+
def _testclasses():
mod = sys.modules[__name__]
return [getattr(mod, name) for name in dir(mod) if name.startswith('Test')]
diff --git a/Lib/test/test_os.py b/Lib/test/test_os.py
index 2bc5fc0..ffc9420 100644
--- a/Lib/test/test_os.py
+++ b/Lib/test/test_os.py
@@ -5,6 +5,7 @@
import os
import unittest
import warnings
+import sys
from test import test_support
warnings.filterwarnings("ignore", "tempnam", RuntimeWarning, __name__)
@@ -364,6 +365,32 @@ class URandomTests (unittest.TestCase):
except NotImplementedError:
pass
+class Win32ErrorTests(unittest.TestCase):
+ def test_rename(self):
+ self.assertRaises(WindowsError, os.rename, test_support.TESTFN, test_support.TESTFN+".bak")
+
+ def test_remove(self):
+ self.assertRaises(WindowsError, os.remove, test_support.TESTFN)
+
+ def test_chdir(self):
+ self.assertRaises(WindowsError, os.chdir, test_support.TESTFN)
+
+ def test_mkdir(self):
+ self.assertRaises(WindowsError, os.chdir, test_support.TESTFN)
+
+ def test_utime(self):
+ self.assertRaises(WindowsError, os.utime, test_support.TESTFN, None)
+
+ def test_access(self):
+ self.assertRaises(WindowsError, os.utime, test_support.TESTFN, 0)
+
+ def test_chmod(self):
+ self.assertRaises(WindowsError, os.utime, test_support.TESTFN, 0)
+
+if sys.platform != 'win32':
+ class Win32ErrorTests(unittest.TestCase):
+ pass
+
def test_main():
test_support.run_unittest(
TemporaryFileTests,
@@ -372,7 +399,8 @@ def test_main():
WalkTests,
MakedirTests,
DevNullTests,
- URandomTests
+ URandomTests,
+ Win32ErrorTests
)
if __name__ == "__main__":
diff --git a/Lib/test/test_pty.py b/Lib/test/test_pty.py
index 99e01b6..59e5162 100644
--- a/Lib/test/test_pty.py
+++ b/Lib/test/test_pty.py
@@ -4,13 +4,6 @@ from test.test_support import verbose, TestFailed, TestSkipped
TEST_STRING_1 = "I wish to buy a fish license.\n"
TEST_STRING_2 = "For my pet fish, Eric.\n"
-# Solaris (at least 2.9 and 2.10) seem to have a fickle isatty(). The first
-# test below, testing the result of os.openpty() for tty-ness, sometimes
-# (but not always) fails. The second isatty test, in the sub-process, always
-# works. Allow that fickle first test to fail on these platforms, since it
-# doesn't actually affect functionality.
-fickle_isatty = ["sunos5"]
-
if verbose:
def debug(msg):
print msg
@@ -31,11 +24,11 @@ def normalize_output(data):
# OSF/1 (Tru64) apparently turns \n into \r\r\n.
if data.endswith('\r\r\n'):
- return data[:-3] + '\n'
+ return data.replace('\r\r\n', '\n')
# IRIX apparently turns \n into \r\n.
if data.endswith('\r\n'):
- return data[:-2] + '\n'
+ return data.replace('\r\n', '\n')
return data
@@ -54,7 +47,7 @@ def test_basic_pty():
# " An optional feature could not be imported " ... ?
raise TestSkipped, "Pseudo-terminals (seemingly) not functional."
- if not os.isatty(slave_fd) and sys.platform not in fickle_isatty:
+ if not os.isatty(slave_fd):
raise TestFailed, "slave_fd is not a tty"
debug("Writing to slave_fd")
diff --git a/Lib/test/test_rfc822.py b/Lib/test/test_rfc822.py
index 0d9f28a..6d22825 100644
--- a/Lib/test/test_rfc822.py
+++ b/Lib/test/test_rfc822.py
@@ -45,6 +45,10 @@ class MessageTestCase(unittest.TestCase):
print 'extra parsed address:', repr(n), repr(a)
continue
i = i + 1
+ self.assertEqual(mn, n,
+ "Un-expected name: %s != %s" % (`mn`, `n`))
+ self.assertEqual(ma, a,
+ "Un-expected address: %s != %s" % (`ma`, `a`))
if mn == n and ma == a:
pass
else:
@@ -129,6 +133,12 @@ class MessageTestCase(unittest.TestCase):
'To: person@dom.ain (User J. Person)\n\n',
[('User J. Person', 'person@dom.ain')])
+ def test_doublecomment(self):
+ # The RFC allows comments within comments in an email addr
+ self.check(
+ 'To: person@dom.ain ((User J. Person)), John Doe <foo@bar.com>\n\n',
+ [('User J. Person', 'person@dom.ain'), ('John Doe', 'foo@bar.com')])
+
def test_twisted(self):
# This one is just twisted. I don't know what the proper
# result should be, but it shouldn't be to infloop, which is
diff --git a/Lib/test/test_setuptools.py b/Lib/test/test_setuptools.py
deleted file mode 100644
index a988303..0000000
--- a/Lib/test/test_setuptools.py
+++ /dev/null
@@ -1,16 +0,0 @@
-"""Tests for setuptools.
-
-The tests for setuptools are defined in the setuptools.tests package;
-this runs them from there.
-"""
-
-import test.test_support
-from setuptools.command.test import ScanningLoader
-
-def test_main():
- test.test_support.run_suite(
- ScanningLoader().loadTestsFromName('setuptools.tests')
- )
-
-if __name__ == "__main__":
- test_main()
diff --git a/Lib/test/test_shutil.py b/Lib/test/test_shutil.py
index 7c28602..6ab5a35 100644
--- a/Lib/test/test_shutil.py
+++ b/Lib/test/test_shutil.py
@@ -48,12 +48,12 @@ class TestShutil(unittest.TestCase):
if self.errorState == 0:
self.assertEqual(func, os.remove)
self.assertEqual(arg, self.childpath)
- self.assertEqual(exc[0], OSError)
+ self.failUnless(issubclass(exc[0], OSError))
self.errorState = 1
else:
self.assertEqual(func, os.rmdir)
self.assertEqual(arg, TESTFN)
- self.assertEqual(exc[0], OSError)
+ self.failUnless(issubclass(exc[0], OSError))
self.errorState = 2
def test_rmtree_dont_delete_file(self):
diff --git a/Lib/test/test_socket.py b/Lib/test/test_socket.py
index 6943080..2246fb6 100644
--- a/Lib/test/test_socket.py
+++ b/Lib/test/test_socket.py
@@ -9,6 +9,7 @@ import time
import thread, threading
import Queue
import sys
+import array
from weakref import proxy
PORT = 50007
@@ -852,8 +853,38 @@ class TestLinuxAbstractNamespace(unittest.TestCase):
self.assertRaises(socket.error, s.bind, address)
+class BufferIOTest(SocketConnectedTest):
+ """
+ Test the buffer versions of socket.recv() and socket.send().
+ """
+ def __init__(self, methodName='runTest'):
+ SocketConnectedTest.__init__(self, methodName=methodName)
+
+ def testRecvBuf(self):
+ buf = array.array('c', ' '*1024)
+ nbytes = self.cli_conn.recv_buf(buf)
+ self.assertEqual(nbytes, len(MSG))
+ msg = buf.tostring()[:len(MSG)]
+ self.assertEqual(msg, MSG)
+
+ def _testRecvBuf(self):
+ buf = buffer(MSG)
+ self.serv_conn.send(buf)
+
+ def testRecvFromBuf(self):
+ buf = array.array('c', ' '*1024)
+ nbytes, addr = self.cli_conn.recvfrom_buf(buf)
+ self.assertEqual(nbytes, len(MSG))
+ msg = buf.tostring()[:len(MSG)]
+ self.assertEqual(msg, MSG)
+
+ def _testRecvFromBuf(self):
+ buf = buffer(MSG)
+ self.serv_conn.send(buf)
+
def test_main():
- tests = [GeneralModuleTests, BasicTCPTest, TCPTimeoutTest, TestExceptions]
+ tests = [GeneralModuleTests, BasicTCPTest, TCPTimeoutTest, TestExceptions,
+ BufferIOTest]
if sys.platform != 'mac':
tests.extend([ BasicUDPTest, UDPTimeoutTest ])
diff --git a/Lib/test/test_sqlite.py b/Lib/test/test_sqlite.py
index 1b1d0e5..f033772 100644
--- a/Lib/test/test_sqlite.py
+++ b/Lib/test/test_sqlite.py
@@ -6,11 +6,12 @@ try:
except ImportError:
raise TestSkipped('no sqlite available')
from sqlite3.test import (dbapi, types, userfunctions,
- factory, transactions)
+ factory, transactions, hooks, regression)
def test_main():
run_unittest(dbapi.suite(), types.suite(), userfunctions.suite(),
- factory.suite(), transactions.suite())
+ factory.suite(), transactions.suite(), hooks.suite(),
+ regression.suite())
if __name__ == "__main__":
test_main()
diff --git a/Lib/test/test_stringprep.py b/Lib/test/test_stringprep.py
index 4459689..2baf4a5 100644
--- a/Lib/test/test_stringprep.py
+++ b/Lib/test/test_stringprep.py
@@ -2,7 +2,6 @@
# Since we don't have them, this test checks only a few codepoints.
from test.test_support import verify, vereq
-import sha
import stringprep
from stringprep import *
@@ -73,6 +72,7 @@ verify(not in_table_d2(u"\u0040"))
# unicode database. Instead, stringprep.py asserts the version of
# the database.
+# import hashlib
# predicates = [k for k in dir(stringprep) if k.startswith("in_table")]
# predicates.sort()
# for p in predicates:
@@ -83,6 +83,6 @@ verify(not in_table_d2(u"\u0040"))
# if f(unichr(i)):
# data[i] = "1"
# data = "".join(data)
-# h = sha.sha()
+# h = hashlib.sha1()
# h.update(data)
-# print p,h.hexdigest()
+# print p, h.hexdigest()
diff --git a/Lib/test/test_struct.py b/Lib/test/test_struct.py
index 9332466..7981a52 100644
--- a/Lib/test/test_struct.py
+++ b/Lib/test/test_struct.py
@@ -1,5 +1,8 @@
from test.test_support import TestFailed, verbose, verify
+import test.test_support
import struct
+import array
+import unittest
import sys
ISBIGENDIAN = sys.byteorder == "big"
@@ -7,6 +10,8 @@ del sys
verify((struct.pack('=i', 1)[0] == chr(0)) == ISBIGENDIAN,
"bigendian determination appears wrong")
+PY_STRUCT_RANGE_CHECKING = 1
+
def string_reverse(s):
chars = list(s)
chars.reverse()
@@ -263,7 +268,7 @@ class IntTester:
else:
# x is out of range -- verify pack realizes that.
- if code in self.BUGGY_RANGE_CHECK:
+ if not PY_STRUCT_RANGE_CHECKING and code in self.BUGGY_RANGE_CHECK:
if verbose:
print "Skipping buggy range check for code", code
else:
@@ -318,7 +323,7 @@ class IntTester:
else:
# x is out of range -- verify pack realizes that.
- if code in self.BUGGY_RANGE_CHECK:
+ if not PY_STRUCT_RANGE_CHECKING and code in self.BUGGY_RANGE_CHECK:
if verbose:
print "Skipping buggy range check for code", code
else:
@@ -437,3 +442,97 @@ def test_705836():
TestFailed("expected OverflowError")
test_705836()
+
+def test_1229380():
+ import sys
+ for endian in ('', '>', '<'):
+ for cls in (int, long):
+ for fmt in ('B', 'H', 'I', 'L'):
+ any_err(struct.pack, endian + fmt, cls(-1))
+
+ any_err(struct.pack, endian + 'B', cls(300))
+ any_err(struct.pack, endian + 'H', cls(70000))
+
+ any_err(struct.pack, endian + 'I', sys.maxint * 4L)
+ any_err(struct.pack, endian + 'L', sys.maxint * 4L)
+
+if PY_STRUCT_RANGE_CHECKING:
+ test_1229380()
+
+class PackBufferTestCase(unittest.TestCase):
+ """
+ Test the packing methods that work on buffers.
+ """
+
+ def test_unpack_from( self ):
+ test_string = 'abcd01234'
+ fmt = '4s'
+ s = struct.Struct(fmt)
+ for cls in (str, buffer):
+ data = cls(test_string)
+ self.assertEquals(s.unpack_from(data), ('abcd',))
+ self.assertEquals(s.unpack_from(data, 2), ('cd01',))
+ self.assertEquals(s.unpack_from(data, 4), ('0123',))
+ for i in xrange(6):
+ self.assertEquals(s.unpack_from(data, i), (data[i:i+4],))
+ for i in xrange(6, len(test_string) + 1):
+ simple_err(s.unpack_from, data, i)
+ for cls in (str, buffer):
+ data = cls(test_string)
+ self.assertEquals(struct.unpack_from(fmt, data), ('abcd',))
+ self.assertEquals(struct.unpack_from(fmt, data, 2), ('cd01',))
+ self.assertEquals(struct.unpack_from(fmt, data, 4), ('0123',))
+ for i in xrange(6):
+ self.assertEquals(struct.unpack_from(fmt, data, i),
+ (data[i:i+4],))
+ for i in xrange(6, len(test_string) + 1):
+ simple_err(struct.unpack_from, fmt, data, i)
+
+ def test_pack_to( self ):
+ test_string = 'Reykjavik rocks, eow!'
+ writable_buf = array.array('c', ' '*100)
+ fmt = '21s'
+ s = struct.Struct(fmt)
+
+ # Test without offset
+ s.pack_to(writable_buf, 0, test_string)
+ from_buf = writable_buf.tostring()[:len(test_string)]
+ self.assertEquals(from_buf, test_string)
+
+ # Test with offset.
+ s.pack_to(writable_buf, 10, test_string)
+ from_buf = writable_buf.tostring()[:len(test_string)+10]
+ self.assertEquals(from_buf, (test_string[:10] + test_string))
+
+ # Go beyond boundaries.
+ small_buf = array.array('c', ' '*10)
+ self.assertRaises(struct.error, s.pack_to, small_buf, 0, test_string)
+ self.assertRaises(struct.error, s.pack_to, small_buf, 2, test_string)
+
+ def test_pack_to_fn( self ):
+ test_string = 'Reykjavik rocks, eow!'
+ writable_buf = array.array('c', ' '*100)
+ fmt = '21s'
+ pack_to = lambda *args: struct.pack_to(fmt, *args)
+
+ # Test without offset
+ pack_to(writable_buf, 0, test_string)
+ from_buf = writable_buf.tostring()[:len(test_string)]
+ self.assertEquals(from_buf, test_string)
+
+ # Test with offset.
+ pack_to(writable_buf, 10, test_string)
+ from_buf = writable_buf.tostring()[:len(test_string)+10]
+ self.assertEquals(from_buf, (test_string[:10] + test_string))
+
+ # Go beyond boundaries.
+ small_buf = array.array('c', ' '*10)
+ self.assertRaises(struct.error, pack_to, small_buf, 0, test_string)
+ self.assertRaises(struct.error, pack_to, small_buf, 2, test_string)
+
+
+def test_main():
+ test.test_support.run_unittest(PackBufferTestCase)
+
+if __name__ == "__main__":
+ test_main()
diff --git a/Lib/test/test_subprocess.py b/Lib/test/test_subprocess.py
index c351ee9..edf5bd0 100644
--- a/Lib/test/test_subprocess.py
+++ b/Lib/test/test_subprocess.py
@@ -347,7 +347,7 @@ class ProcessTestCase(unittest.TestCase):
stdout=subprocess.PIPE,
universal_newlines=1)
stdout = p.stdout.read()
- if hasattr(open, 'newlines'):
+ if hasattr(file, 'newlines'):
# Interpreter with universal newline support
self.assertEqual(stdout,
"line1\nline2\nline3\nline4\nline5\nline6")
@@ -374,7 +374,7 @@ class ProcessTestCase(unittest.TestCase):
stdout=subprocess.PIPE, stderr=subprocess.PIPE,
universal_newlines=1)
(stdout, stderr) = p.communicate()
- if hasattr(open, 'newlines'):
+ if hasattr(file, 'newlines'):
# Interpreter with universal newline support
self.assertEqual(stdout,
"line1\nline2\nline3\nline4\nline5\nline6")
diff --git a/Lib/test/test_sundry.py b/Lib/test/test_sundry.py
index af13684..f19467c 100644
--- a/Lib/test/test_sundry.py
+++ b/Lib/test/test_sundry.py
@@ -50,11 +50,7 @@ import pstats
import py_compile
import pydoc
import rexec
-try:
- import rlcompleter # not available on Windows
-except ImportError:
- if verbose:
- print "skipping rlcompleter"
+import rlcompleter
import sched
import smtplib
import sndhdr
diff --git a/Lib/test/test_support.py b/Lib/test/test_support.py
index cc71366..2d08f4d 100644
--- a/Lib/test/test_support.py
+++ b/Lib/test/test_support.py
@@ -30,7 +30,9 @@ class ResourceDenied(TestSkipped):
"""
verbose = 1 # Flag set to 0 by regrtest.py
-use_resources = None # Flag set to [] by regrtest.py
+use_resources = None # Flag set to [] by regrtest.py
+max_memuse = 0 # Disable bigmem tests (they will still be run with
+ # small sizes, to make sure they work.)
# _original_stdout is meant to hold stdout at the time regrtest began.
# This may be "the real" stdout, or IDLE's emulation of stdout, or whatever.
@@ -250,6 +252,108 @@ def open_urlresource(url):
return open(fn)
#=======================================================================
+# Decorator for running a function in a different locale, correctly resetting
+# it afterwards.
+
+def run_with_locale(catstr, *locales):
+ def decorator(func):
+ def inner(*args, **kwds):
+ try:
+ import locale
+ category = getattr(locale, catstr)
+ orig_locale = locale.setlocale(category)
+ except AttributeError:
+ # if the test author gives us an invalid category string
+ raise
+ except:
+ # cannot retrieve original locale, so do nothing
+ locale = orig_locale = None
+ else:
+ for loc in locales:
+ try:
+ locale.setlocale(category, loc)
+ break
+ except:
+ pass
+
+ # now run the function, resetting the locale on exceptions
+ try:
+ return func(*args, **kwds)
+ finally:
+ if locale and orig_locale:
+ locale.setlocale(category, orig_locale)
+ inner.func_name = func.func_name
+ inner.__doc__ = func.__doc__
+ return inner
+ return decorator
+
+#=======================================================================
+# Big-memory-test support. Separate from 'resources' because memory use should be configurable.
+
+# Some handy shorthands. Note that these are used for byte-limits as well
+# as size-limits, in the various bigmem tests
+_1M = 1024*1024
+_1G = 1024 * _1M
+_2G = 2 * _1G
+
+def set_memlimit(limit):
+ import re
+ global max_memuse
+ sizes = {
+ 'k': 1024,
+ 'm': _1M,
+ 'g': _1G,
+ 't': 1024*_1G,
+ }
+ m = re.match(r'(\d+(\.\d+)?) (K|M|G|T)b?$', limit,
+ re.IGNORECASE | re.VERBOSE)
+ if m is None:
+ raise ValueError('Invalid memory limit %r' % (limit,))
+ memlimit = int(float(m.group(1)) * sizes[m.group(3).lower()])
+ if memlimit < 2.5*_1G:
+ raise ValueError('Memory limit %r too low to be useful' % (limit,))
+ max_memuse = memlimit
+
+def bigmemtest(minsize, memuse, overhead=5*_1M):
+ """Decorator for bigmem tests.
+
+ 'minsize' is the minimum useful size for the test (in arbitrary,
+ test-interpreted units.) 'memuse' is the number of 'bytes per size' for
+ the test, or a good estimate of it. 'overhead' specifies fixed overhead,
+ independant of the testsize, and defaults to 5Mb.
+
+ The decorator tries to guess a good value for 'size' and passes it to
+ the decorated test function. If minsize * memuse is more than the
+ allowed memory use (as defined by max_memuse), the test is skipped.
+ Otherwise, minsize is adjusted upward to use up to max_memuse.
+ """
+ def decorator(f):
+ def wrapper(self):
+ if not max_memuse:
+ # If max_memuse is 0 (the default),
+ # we still want to run the tests with size set to a few kb,
+ # to make sure they work. We still want to avoid using
+ # too much memory, though, but we do that noisily.
+ maxsize = 5147
+ self.failIf(maxsize * memuse + overhead > 20 * _1M)
+ else:
+ maxsize = int((max_memuse - overhead) / memuse)
+ if maxsize < minsize:
+ # Really ought to print 'test skipped' or something
+ if verbose:
+ sys.stderr.write("Skipping %s because of memory "
+ "constraint\n" % (f.__name__,))
+ return
+ # Try to keep some breathing room in memory use
+ maxsize = max(maxsize - 50 * _1M, minsize)
+ return f(self, maxsize)
+ wrapper.minsize = minsize
+ wrapper.memuse = memuse
+ wrapper.overhead = overhead
+ return wrapper
+ return decorator
+
+#=======================================================================
# Preliminary PyUNIT integration.
import unittest
diff --git a/Lib/test/test_syntax.py b/Lib/test/test_syntax.py
index b61debf..dc7a16d 100644
--- a/Lib/test/test_syntax.py
+++ b/Lib/test/test_syntax.py
@@ -86,13 +86,16 @@ SyntaxError: can't assign to literal (<doctest test.test_syntax[11]>, line 1)
Traceback (most recent call last):
SyntaxError: can't assign to operator (<doctest test.test_syntax[12]>, line 1)
+>>> a if 1 else b = 1
+Traceback (most recent call last):
+SyntaxError: can't assign to conditional expression (<doctest test.test_syntax[13]>, line 1)
From compiler_complex_args():
>>> def f(None=1):
... pass
Traceback (most recent call last):
-SyntaxError: assignment to None (<doctest test.test_syntax[13]>, line 1)
+SyntaxError: assignment to None (<doctest test.test_syntax[14]>, line 1)
From ast_for_arguments():
@@ -100,22 +103,22 @@ From ast_for_arguments():
>>> def f(x, y=1, z):
... pass
Traceback (most recent call last):
-SyntaxError: non-default argument follows default argument (<doctest test.test_syntax[14]>, line 1)
+SyntaxError: non-default argument follows default argument (<doctest test.test_syntax[15]>, line 1)
>>> def f(x, None):
... pass
Traceback (most recent call last):
-SyntaxError: assignment to None (<doctest test.test_syntax[15]>, line 1)
+SyntaxError: assignment to None (<doctest test.test_syntax[16]>, line 1)
>>> def f(*None):
... pass
Traceback (most recent call last):
-SyntaxError: assignment to None (<doctest test.test_syntax[16]>, line 1)
+SyntaxError: assignment to None (<doctest test.test_syntax[17]>, line 1)
>>> def f(**None):
... pass
Traceback (most recent call last):
-SyntaxError: assignment to None (<doctest test.test_syntax[17]>, line 1)
+SyntaxError: assignment to None (<doctest test.test_syntax[18]>, line 1)
From ast_for_funcdef():
@@ -123,7 +126,7 @@ From ast_for_funcdef():
>>> def None(x):
... pass
Traceback (most recent call last):
-SyntaxError: assignment to None (<doctest test.test_syntax[18]>, line 1)
+SyntaxError: assignment to None (<doctest test.test_syntax[19]>, line 1)
From ast_for_call():
@@ -135,7 +138,7 @@ From ast_for_call():
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
>>> f(x for x in L, 1)
Traceback (most recent call last):
-SyntaxError: Generator expression must be parenthesized if not sole argument (<doctest test.test_syntax[22]>, line 1)
+SyntaxError: Generator expression must be parenthesized if not sole argument (<doctest test.test_syntax[23]>, line 1)
>>> f((x for x in L), 1)
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
@@ -167,7 +170,7 @@ SyntaxError: Generator expression must be parenthesized if not sole argument (<d
... i244, i245, i246, i247, i248, i249, i250, i251, i252,
... i253, i254, i255)
Traceback (most recent call last):
-SyntaxError: more than 255 arguments (<doctest test.test_syntax[24]>, line 1)
+SyntaxError: more than 255 arguments (<doctest test.test_syntax[25]>, line 1)
The actual error cases counts positional arguments, keyword arguments,
and generator expression arguments separately. This test combines the
@@ -201,37 +204,37 @@ three.
... (x for x in i244), i245, i246, i247, i248, i249, i250, i251,
... i252=1, i253=1, i254=1, i255=1)
Traceback (most recent call last):
-SyntaxError: more than 255 arguments (<doctest test.test_syntax[25]>, line 1)
+SyntaxError: more than 255 arguments (<doctest test.test_syntax[26]>, line 1)
>>> f(lambda x: x[0] = 3)
Traceback (most recent call last):
-SyntaxError: lambda cannot contain assignment (<doctest test.test_syntax[26]>, line 1)
+SyntaxError: lambda cannot contain assignment (<doctest test.test_syntax[27]>, line 1)
The grammar accepts any test (basically, any expression) in the
keyword slot of a call site. Test a few different options.
>>> f(x()=2)
Traceback (most recent call last):
-SyntaxError: keyword can't be an expression (<doctest test.test_syntax[27]>, line 1)
+SyntaxError: keyword can't be an expression (<doctest test.test_syntax[28]>, line 1)
>>> f(a or b=1)
Traceback (most recent call last):
-SyntaxError: keyword can't be an expression (<doctest test.test_syntax[28]>, line 1)
+SyntaxError: keyword can't be an expression (<doctest test.test_syntax[29]>, line 1)
>>> f(x.y=1)
Traceback (most recent call last):
-SyntaxError: keyword can't be an expression (<doctest test.test_syntax[29]>, line 1)
+SyntaxError: keyword can't be an expression (<doctest test.test_syntax[30]>, line 1)
From ast_for_expr_stmt():
>>> (x for x in x) += 1
Traceback (most recent call last):
-SyntaxError: augmented assignment to generator expression not possible (<doctest test.test_syntax[30]>, line 1)
+SyntaxError: augmented assignment to generator expression not possible (<doctest test.test_syntax[31]>, line 1)
>>> None += 1
Traceback (most recent call last):
-SyntaxError: assignment to None (<doctest test.test_syntax[31]>, line 1)
+SyntaxError: assignment to None (<doctest test.test_syntax[32]>, line 1)
>>> f() += 1
Traceback (most recent call last):
-SyntaxError: illegal expression for augmented assignment (<doctest test.test_syntax[32]>, line 1)
+SyntaxError: illegal expression for augmented assignment (<doctest test.test_syntax[33]>, line 1)
"""
import re
@@ -243,15 +246,18 @@ from test import test_support
class SyntaxTestCase(unittest.TestCase):
def _check_error(self, code, errtext,
- filename="<testcase>", mode="exec"):
+ filename="<testcase>", mode="exec", subclass=None):
"""Check that compiling code raises SyntaxError with errtext.
errtest is a regular expression that must be present in the
- test of the exception raised.
+ test of the exception raised. If subclass is specified it
+ is the expected subclass of SyntaxError (e.g. IndentationError).
"""
try:
compile(code, filename, mode)
except SyntaxError, err:
+ if subclass and not isinstance(err, subclass):
+ self.fail("SyntaxError is not a %s" % subclass.__name__)
mo = re.search(errtext, str(err))
if mo is None:
self.fail("SyntaxError did not contain '%r'" % (errtext,))
@@ -290,6 +296,22 @@ class SyntaxTestCase(unittest.TestCase):
:""")
self._check_error(source, "nested scope")
+ def test_unexpected_indent(self):
+ self._check_error("foo()\n bar()\n", "unexpected indent",
+ subclass=IndentationError)
+
+ def test_no_indent(self):
+ self._check_error("if 1:\nfoo()", "expected an indented block",
+ subclass=IndentationError)
+
+ def test_bad_outdent(self):
+ self._check_error("if 1:\n foo()\n bar()",
+ "unindent does not match .* level",
+ subclass=IndentationError)
+
+ def test_kwargs_last(self):
+ self._check_error("int(base=10, '2')", "non-keyword arg")
+
def test_main():
test_support.run_unittest(SyntaxTestCase)
from test import test_syntax
diff --git a/Lib/test/test_tarfile.py b/Lib/test/test_tarfile.py
index 98349c4..8ee0f41 100644
--- a/Lib/test/test_tarfile.py
+++ b/Lib/test/test_tarfile.py
@@ -2,6 +2,7 @@ import sys
import os
import shutil
import tempfile
+import StringIO
import unittest
import tarfile
@@ -25,7 +26,7 @@ def path(path):
testtar = path("testtar.tar")
tempdir = os.path.join(tempfile.gettempdir(), "testtar" + os.extsep + "dir")
tempname = test_support.TESTFN
-membercount = 10
+membercount = 12
def tarname(comp=""):
if not comp:
@@ -86,7 +87,9 @@ class ReadTest(BaseTest):
if self.sep != "|":
filename = "0-REGTYPE-TEXT"
self.tar.extract(filename, dirname())
- lines1 = file(os.path.join(dirname(), filename), "rU").readlines()
+ f = open(os.path.join(dirname(), filename), "rU")
+ lines1 = f.readlines()
+ f.close()
lines2 = self.tar.extractfile(filename).readlines()
self.assert_(lines1 == lines2,
"_FileObject.readline() does not work correctly")
@@ -96,7 +99,9 @@ class ReadTest(BaseTest):
if self.sep != "|":
filename = "0-REGTYPE-TEXT"
self.tar.extract(filename, dirname())
- lines1 = file(os.path.join(dirname(), filename), "rU").readlines()
+ f = open(os.path.join(dirname(), filename), "rU")
+ lines1 = f.readlines()
+ f.close()
lines2 = [line for line in self.tar.extractfile(filename)]
self.assert_(lines1 == lines2,
"ExFileObject iteration does not work correctly")
@@ -107,7 +112,9 @@ class ReadTest(BaseTest):
if self.sep != "|":
filename = "0-REGTYPE"
self.tar.extract(filename, dirname())
- data = file(os.path.join(dirname(), filename), "rb").read()
+ f = open(os.path.join(dirname(), filename), "rb")
+ data = f.read()
+ f.close()
tarinfo = self.tar.getmember(filename)
fobj = self.tar.extractfile(tarinfo)
@@ -155,7 +162,7 @@ class ReadTest(BaseTest):
tarinfo = tarfile.TarInfo("directory/")
tarinfo.type = tarfile.REGTYPE
- fobj = file(filename, "w")
+ fobj = open(filename, "w")
fobj.write(tarinfo.tobuf())
fobj.close()
@@ -209,8 +216,21 @@ class ReadStreamTest(ReadTest):
self.assert_(v2 is not None, "stream.extractfile() failed")
self.assert_(v1.read() == v2.read(), "stream extraction failed")
+ tar.close()
stream.close()
+class ReadDetectTest(ReadTest):
+
+ def setUp(self):
+ self.tar = tarfile.open(tarname(self.comp), self.mode)
+
+class ReadDetectFileobjTest(ReadTest):
+
+ def setUp(self):
+ name = tarname(self.comp)
+ self.tar = tarfile.open(name, mode=self.mode,
+ fileobj=open(name, "rb"))
+
class ReadAsteriskTest(ReadTest):
def setUp(self):
@@ -254,7 +274,7 @@ class WriteTest(BaseTest):
if not tarinfo.isreg():
continue
f = self.src.extractfile(tarinfo)
- if self.dst.posix and len(tarinfo.name) > tarfile.LENGTH_NAME:
+ if self.dst.posix and len(tarinfo.name) > tarfile.LENGTH_NAME and "/" not in tarinfo.name:
self.assertRaises(ValueError, self.dst.addfile,
tarinfo, f)
else:
@@ -273,15 +293,22 @@ class WriteSize0Test(BaseTest):
def test_file(self):
path = os.path.join(self.tmpdir, "file")
- file(path, "w")
+ f = open(path, "w")
+ f.close()
tarinfo = self.dst.gettarinfo(path)
self.assertEqual(tarinfo.size, 0)
- file(path, "w").write("aaa")
+ f = open(path, "w")
+ f.write("aaa")
+ f.close()
tarinfo = self.dst.gettarinfo(path)
self.assertEqual(tarinfo.size, 3)
def test_directory(self):
path = os.path.join(self.tmpdir, "directory")
+ if os.path.exists(path):
+ # This shouldn't be necessary, but is <wink> if a previous
+ # run was killed in mid-stream.
+ shutil.rmtree(path)
os.mkdir(path)
tarinfo = self.dst.gettarinfo(path)
self.assertEqual(tarinfo.size, 0)
@@ -385,6 +412,52 @@ class WriteGNULongTest(unittest.TestCase):
self._test(("longnam/" * 127) + "longname_",
("longlnk/" * 127) + "longlink_")
+class ReadGNULongTest(unittest.TestCase):
+
+ def setUp(self):
+ self.tar = tarfile.open(tarname())
+
+ def tearDown(self):
+ self.tar.close()
+
+ def test_1471427(self):
+ """Test reading of longname (bug #1471427).
+ """
+ name = "test/" * 20 + "0-REGTYPE"
+ try:
+ tarinfo = self.tar.getmember(name)
+ except KeyError:
+ tarinfo = None
+ self.assert_(tarinfo is not None, "longname not found")
+ self.assert_(tarinfo.type != tarfile.DIRTYPE, "read longname as dirtype")
+
+ def test_read_name(self):
+ name = ("0-LONGNAME-" * 10)[:101]
+ try:
+ tarinfo = self.tar.getmember(name)
+ except KeyError:
+ tarinfo = None
+ self.assert_(tarinfo is not None, "longname not found")
+
+ def test_read_link(self):
+ link = ("1-LONGLINK-" * 10)[:101]
+ name = ("0-LONGNAME-" * 10)[:101]
+ try:
+ tarinfo = self.tar.getmember(link)
+ except KeyError:
+ tarinfo = None
+ self.assert_(tarinfo is not None, "longlink not found")
+ self.assert_(tarinfo.linkname == name, "linkname wrong")
+
+ def test_truncated_longname(self):
+ f = open(tarname())
+ fobj = StringIO.StringIO(f.read(1024))
+ f.close()
+ tar = tarfile.open(name="foo.tar", fileobj=fobj)
+ self.assert_(len(tar.getmembers()) == 0, "")
+ tar.close()
+
+
class ExtractHardlinkTest(BaseTest):
def test_hardlink(self):
@@ -420,7 +493,9 @@ class CreateHardlinkTest(BaseTest):
if os.path.exists(self.bar):
os.remove(self.bar)
- file(self.foo, "w").write("foo")
+ f = open(self.foo, "w")
+ f.write("foo")
+ f.close()
self.tar.add(self.foo)
def test_add_twice(self):
@@ -459,6 +534,10 @@ class WriteTestGzip(WriteTest):
comp = "gz"
class WriteStreamTestGzip(WriteStreamTest):
comp = "gz"
+class ReadDetectTestGzip(ReadDetectTest):
+ comp = "gz"
+class ReadDetectFileobjTestGzip(ReadDetectFileobjTest):
+ comp = "gz"
class ReadAsteriskTestGzip(ReadAsteriskTest):
comp = "gz"
class ReadStreamAsteriskTestGzip(ReadStreamAsteriskTest):
@@ -482,6 +561,10 @@ if bz2:
comp = "bz2"
class WriteStreamTestBzip2(WriteStreamTestGzip):
comp = "bz2"
+ class ReadDetectTestBzip2(ReadDetectTest):
+ comp = "bz2"
+ class ReadDetectFileobjTestBzip2(ReadDetectFileobjTest):
+ comp = "bz2"
class ReadAsteriskTestBzip2(ReadAsteriskTest):
comp = "bz2"
class ReadStreamAsteriskTestBzip2(ReadStreamAsteriskTest):
@@ -495,23 +578,34 @@ if not gzip:
del WriteStreamTestGzip
def test_main():
+ # Create archive.
+ f = open(tarname(), "rb")
+ fguts = f.read()
+ f.close()
if gzip:
# create testtar.tar.gz
- gzip.open(tarname("gz"), "wb").write(file(tarname(), "rb").read())
+ tar = gzip.open(tarname("gz"), "wb")
+ tar.write(fguts)
+ tar.close()
if bz2:
# create testtar.tar.bz2
- bz2.BZ2File(tarname("bz2"), "wb").write(file(tarname(), "rb").read())
+ tar = bz2.BZ2File(tarname("bz2"), "wb")
+ tar.write(fguts)
+ tar.close()
tests = [
FileModeTest,
ReadTest,
ReadStreamTest,
+ ReadDetectTest,
+ ReadDetectFileobjTest,
ReadAsteriskTest,
ReadStreamAsteriskTest,
WriteTest,
WriteSize0Test,
WriteStreamTest,
WriteGNULongTest,
+ ReadGNULongTest,
]
if hasattr(os, "link"):
@@ -522,6 +616,7 @@ def test_main():
tests.extend([
ReadTestGzip, ReadStreamTestGzip,
WriteTestGzip, WriteStreamTestGzip,
+ ReadDetectTestGzip, ReadDetectFileobjTestGzip,
ReadAsteriskTestGzip, ReadStreamAsteriskTestGzip
])
@@ -529,6 +624,7 @@ def test_main():
tests.extend([
ReadTestBzip2, ReadStreamTestBzip2,
WriteTestBzip2, WriteStreamTestBzip2,
+ ReadDetectTestBzip2, ReadDetectFileobjTestBzip2,
ReadAsteriskTestBzip2, ReadStreamAsteriskTestBzip2
])
try:
diff --git a/Lib/test/test_tcl.py b/Lib/test/test_tcl.py
index 2eeabc1..e3fbf98 100644
--- a/Lib/test/test_tcl.py
+++ b/Lib/test/test_tcl.py
@@ -124,6 +124,7 @@ class TclTest(unittest.TestCase):
self.assertRaises(TclError,tcl.winfo_geometry)
tcl.loadtk()
self.assertEqual('1x1+0+0', tcl.winfo_geometry())
+ tcl.destroy()
def testLoadTkFailure(self):
import os
diff --git a/Lib/test/test_threaded_import.py b/Lib/test/test_threaded_import.py
index dc27d4e..0642d25 100644
--- a/Lib/test/test_threaded_import.py
+++ b/Lib/test/test_threaded_import.py
@@ -6,7 +6,7 @@
# randrange, and then Python hangs.
import thread
-from test.test_support import verbose, TestSkipped
+from test.test_support import verbose, TestSkipped, TestFailed
critical_section = thread.allocate_lock()
done = thread.allocate_lock()
@@ -25,6 +25,23 @@ def task():
if finished:
done.release()
+def test_import_hangers():
+ import sys
+ if verbose:
+ print "testing import hangers ...",
+
+ from test import threaded_import_hangers
+
+ try:
+ if threaded_import_hangers.errors:
+ raise TestFailed(threaded_import_hangers.errors)
+ elif verbose:
+ print "OK."
+ finally:
+ # In case this test is run again, make sure the helper module
+ # gets loaded from scratch again.
+ del sys.modules['test.threaded_import_hangers']
+
# Tricky: When regrtest imports this module, the thread running regrtest
# grabs the import lock and won't let go of it until this module returns.
# All other threads attempting an import hang for the duration. Since
@@ -53,5 +70,7 @@ def test_main(): # magic name! see above
print "OK."
done.release()
+ test_import_hangers()
+
if __name__ == "__main__":
test_main()
diff --git a/Lib/test/test_traceback.py b/Lib/test/test_traceback.py
index 22c0456..1b59f98 100644
--- a/Lib/test/test_traceback.py
+++ b/Lib/test/test_traceback.py
@@ -103,6 +103,12 @@ def test():
import sys
sys.exc_traceback.__members__
+ def test_base_exception(self):
+ # Test that exceptions derived from BaseException are formatted right
+ e = KeyboardInterrupt()
+ lst = traceback.format_exception_only(e.__class__, e)
+ self.assertEqual(lst, ['KeyboardInterrupt\n'])
+
def test_main():
run_unittest(TracebackCases)
diff --git a/Lib/test/test_unicode.py b/Lib/test/test_unicode.py
index c7113b5..34f9371 100644
--- a/Lib/test/test_unicode.py
+++ b/Lib/test/test_unicode.py
@@ -411,19 +411,10 @@ class UnicodeTest(
return u'\u1234'
self.assertEqual('%s' % Wrapper(), u'\u1234')
+ @test_support.run_with_locale('LC_ALL', 'de_DE', 'fr_FR')
def test_format_float(self):
- try:
- import locale
- orig_locale = locale.setlocale(locale.LC_ALL)
- locale.setlocale(locale.LC_ALL, 'de_DE')
- except (ImportError, locale.Error):
- return # skip if we can't set locale
-
- try:
- # should not format with a comma, but always with C locale
- self.assertEqual(u'1.0', u'%.1f' % 1.0)
- finally:
- locale.setlocale(locale.LC_ALL, orig_locale)
+ # should not format with a comma, but always with C locale
+ self.assertEqual(u'1.0', u'%.1f' % 1.0)
def test_constructor(self):
# unicode(obj) tests (this maps to PyObject_Unicode() at C level)
diff --git a/Lib/test/test_unicodedata.py b/Lib/test/test_unicodedata.py
index f84caad..0023bf4 100644
--- a/Lib/test/test_unicodedata.py
+++ b/Lib/test/test_unicodedata.py
@@ -6,7 +6,7 @@
"""#"
import unittest, test.test_support
-import sha
+import hashlib
encoding = 'utf-8'
@@ -16,10 +16,10 @@ encoding = 'utf-8'
class UnicodeMethodsTest(unittest.TestCase):
# update this, if the database changes
- expectedchecksum = 'a6555cd209d960dcfa17bfdce0c96d91cfa9a9ba'
+ expectedchecksum = 'c198ed264497f108434b3f576d4107237221cc8a'
def test_method_checksum(self):
- h = sha.sha()
+ h = hashlib.sha1()
for i in range(65536):
char = unichr(i)
data = [
@@ -75,11 +75,11 @@ class UnicodeDatabaseTest(unittest.TestCase):
class UnicodeFunctionsTest(UnicodeDatabaseTest):
# update this, if the database changes
- expectedchecksum = 'b45b79f3203ee1a896d9b5655484adaff5d4964b'
+ expectedchecksum = '4e389f97e9f88b8b7ab743121fd643089116f9f2'
def test_function_checksum(self):
data = []
- h = sha.sha()
+ h = hashlib.sha1()
for i in range(0x10000):
char = unichr(i)
diff --git a/Lib/test/test_urllib2.py b/Lib/test/test_urllib2.py
index 64a2ee9..c8f19bc 100644
--- a/Lib/test/test_urllib2.py
+++ b/Lib/test/test_urllib2.py
@@ -10,10 +10,7 @@ from urllib2 import Request, OpenerDirector
# XXX
# Request
# CacheFTPHandler (hard to write)
-# parse_keqv_list, parse_http_list (I'm leaving this for Anthony Baxter
-# and Greg Stein, since they're doing Digest Authentication)
-# Authentication stuff (ditto)
-# CustomProxy, CustomProxyHandler
+# parse_keqv_list, parse_http_list, HTTPDigestAuthHandler
class TrivialTests(unittest.TestCase):
def test_trivial(self):
@@ -49,6 +46,70 @@ class TrivialTests(unittest.TestCase):
self.assertEquals(urllib2.parse_http_list(string), list)
+def test_password_manager(self):
+ """
+ >>> mgr = urllib2.HTTPPasswordMgr()
+ >>> add = mgr.add_password
+ >>> add("Some Realm", "http://example.com/", "joe", "password")
+ >>> add("Some Realm", "http://example.com/ni", "ni", "ni")
+ >>> add("c", "http://example.com/foo", "foo", "ni")
+ >>> add("c", "http://example.com/bar", "bar", "nini")
+ >>> add("b", "http://example.com/", "first", "blah")
+ >>> add("b", "http://example.com/", "second", "spam")
+ >>> add("a", "http://example.com", "1", "a")
+ >>> add("Some Realm", "http://c.example.com:3128", "3", "c")
+ >>> add("Some Realm", "d.example.com", "4", "d")
+ >>> add("Some Realm", "e.example.com:3128", "5", "e")
+
+ >>> mgr.find_user_password("Some Realm", "example.com")
+ ('joe', 'password')
+ >>> mgr.find_user_password("Some Realm", "http://example.com")
+ ('joe', 'password')
+ >>> mgr.find_user_password("Some Realm", "http://example.com/")
+ ('joe', 'password')
+ >>> mgr.find_user_password("Some Realm", "http://example.com/spam")
+ ('joe', 'password')
+ >>> mgr.find_user_password("Some Realm", "http://example.com/spam/spam")
+ ('joe', 'password')
+ >>> mgr.find_user_password("c", "http://example.com/foo")
+ ('foo', 'ni')
+ >>> mgr.find_user_password("c", "http://example.com/bar")
+ ('bar', 'nini')
+
+ Currently, we use the highest-level path where more than one match:
+
+ >>> mgr.find_user_password("Some Realm", "http://example.com/ni")
+ ('joe', 'password')
+
+ Use latest add_password() in case of conflict:
+
+ >>> mgr.find_user_password("b", "http://example.com/")
+ ('second', 'spam')
+
+ No special relationship between a.example.com and example.com:
+
+ >>> mgr.find_user_password("a", "http://example.com/")
+ ('1', 'a')
+ >>> mgr.find_user_password("a", "http://a.example.com/")
+ (None, None)
+
+ Ports:
+
+ >>> mgr.find_user_password("Some Realm", "c.example.com")
+ (None, None)
+ >>> mgr.find_user_password("Some Realm", "c.example.com:3128")
+ ('3', 'c')
+ >>> mgr.find_user_password("Some Realm", "http://c.example.com:3128")
+ ('3', 'c')
+ >>> mgr.find_user_password("Some Realm", "d.example.com")
+ ('4', 'd')
+ >>> mgr.find_user_password("Some Realm", "e.example.com:3128")
+ ('5', 'e')
+
+ """
+ pass
+
+
class MockOpener:
addheaders = []
def open(self, req, data=None):
@@ -89,6 +150,8 @@ class FakeMethod:
return self.handle(self.meth_name, self.action, *args)
class MockHandler:
+ # useful for testing handler machinery
+ # see add_ordered_mock_handlers() docstring
handler_order = 500
def __init__(self, methods):
self._define_methods(methods)
@@ -161,6 +224,50 @@ def add_ordered_mock_handlers(opener, meth_spec):
opener.add_handler(h)
return handlers
+def build_test_opener(*handler_instances):
+ opener = OpenerDirector()
+ for h in handler_instances:
+ opener.add_handler(h)
+ return opener
+
+class MockHTTPHandler(urllib2.BaseHandler):
+ # useful for testing redirections and auth
+ # sends supplied headers and code as first response
+ # sends 200 OK as second response
+ def __init__(self, code, headers):
+ self.code = code
+ self.headers = headers
+ self.reset()
+ def reset(self):
+ self._count = 0
+ self.requests = []
+ def http_open(self, req):
+ import mimetools, httplib, copy
+ from StringIO import StringIO
+ self.requests.append(copy.deepcopy(req))
+ if self._count == 0:
+ self._count = self._count + 1
+ name = httplib.responses[self.code]
+ msg = mimetools.Message(StringIO(self.headers))
+ return self.parent.error(
+ "http", req, MockFile(), self.code, name, msg)
+ else:
+ self.req = req
+ msg = mimetools.Message(StringIO("\r\n\r\n"))
+ return MockResponse(200, "OK", msg, "", req.get_full_url())
+
+class MockPasswordManager:
+ def add_password(self, realm, uri, user, password):
+ self.realm = realm
+ self.url = uri
+ self.user = user
+ self.password = password
+ def find_user_password(self, realm, authuri):
+ self.target_realm = realm
+ self.target_url = authuri
+ return self.user, self.password
+
+
class OpenerDirectorTests(unittest.TestCase):
def test_handled(self):
@@ -612,33 +719,18 @@ class HandlerTests(unittest.TestCase):
urllib2.HTTPRedirectHandler.max_redirections)
def test_cookie_redirect(self):
- class MockHTTPHandler(urllib2.HTTPHandler):
- def __init__(self): self._count = 0
- def http_open(self, req):
- import mimetools
- from StringIO import StringIO
- if self._count == 0:
- self._count = self._count + 1
- msg = mimetools.Message(
- StringIO("Location: http://www.cracker.com/\r\n\r\n"))
- return self.parent.error(
- "http", req, MockFile(), 302, "Found", msg)
- else:
- self.req = req
- msg = mimetools.Message(StringIO("\r\n\r\n"))
- return MockResponse(200, "OK", msg, "", req.get_full_url())
# cookies shouldn't leak into redirected requests
from cookielib import CookieJar
- from urllib2 import build_opener, HTTPHandler, HTTPError, \
- HTTPCookieProcessor
from test.test_cookielib import interact_netscape
cj = CookieJar()
interact_netscape(cj, "http://www.example.com/", "spam=eggs")
- hh = MockHTTPHandler()
- cp = HTTPCookieProcessor(cj)
- o = build_opener(hh, cp)
+ hh = MockHTTPHandler(302, "Location: http://www.cracker.com/\r\n\r\n")
+ hdeh = urllib2.HTTPDefaultErrorHandler()
+ hrh = urllib2.HTTPRedirectHandler()
+ cp = urllib2.HTTPCookieProcessor(cj)
+ o = build_test_opener(hh, hdeh, hrh, cp)
o.open("http://www.example.com/")
self.assert_(not hh.req.has_header("Cookie"))
@@ -659,6 +751,92 @@ class HandlerTests(unittest.TestCase):
self.assertEqual([(handlers[0], "http_open")],
[tup[0:2] for tup in o.calls])
+ def test_basic_auth(self):
+ opener = OpenerDirector()
+ password_manager = MockPasswordManager()
+ auth_handler = urllib2.HTTPBasicAuthHandler(password_manager)
+ realm = "ACME Widget Store"
+ http_handler = MockHTTPHandler(
+ 401, 'WWW-Authenticate: Basic realm="%s"\r\n\r\n' % realm)
+ self._test_basic_auth(opener, auth_handler, "Authorization",
+ realm, http_handler, password_manager,
+ "http://acme.example.com/protected",
+ "http://acme.example.com/protected",
+ )
+
+ def test_proxy_basic_auth(self):
+ opener = OpenerDirector()
+ ph = urllib2.ProxyHandler(dict(http="proxy.example.com:3128"))
+ opener.add_handler(ph)
+ password_manager = MockPasswordManager()
+ auth_handler = urllib2.ProxyBasicAuthHandler(password_manager)
+ realm = "ACME Networks"
+ http_handler = MockHTTPHandler(
+ 407, 'Proxy-Authenticate: Basic realm="%s"\r\n\r\n' % realm)
+ self._test_basic_auth(opener, auth_handler, "Proxy-authorization",
+ realm, http_handler, password_manager,
+ "http://acme.example.com:3128/protected",
+ "proxy.example.com:3128",
+ )
+
+ def test_basic_and_digest_auth_handlers(self):
+ # HTTPDigestAuthHandler threw an exception if it couldn't handle a 40*
+ # response (http://python.org/sf/1479302), where it should instead
+ # return None to allow another handler (especially
+ # HTTPBasicAuthHandler) to handle the response.
+ class TestDigestAuthHandler(urllib2.HTTPDigestAuthHandler):
+ handler_order = 400 # strictly before HTTPBasicAuthHandler
+ opener = OpenerDirector()
+ password_manager = MockPasswordManager()
+ digest_handler = TestDigestAuthHandler(password_manager)
+ basic_handler = urllib2.HTTPBasicAuthHandler(password_manager)
+ opener.add_handler(digest_handler)
+ realm = "ACME Networks"
+ http_handler = MockHTTPHandler(
+ 401, 'WWW-Authenticate: Basic realm="%s"\r\n\r\n' % realm)
+ self._test_basic_auth(opener, basic_handler, "Authorization",
+ realm, http_handler, password_manager,
+ "http://acme.example.com/protected",
+ "http://acme.example.com/protected",
+ )
+
+ def _test_basic_auth(self, opener, auth_handler, auth_header,
+ realm, http_handler, password_manager,
+ request_url, protected_url):
+ import base64, httplib
+ user, password = "wile", "coyote"
+ opener.add_handler(auth_handler)
+ opener.add_handler(http_handler)
+
+ # .add_password() fed through to password manager
+ auth_handler.add_password(realm, request_url, user, password)
+ self.assertEqual(realm, password_manager.realm)
+ self.assertEqual(request_url, password_manager.url)
+ self.assertEqual(user, password_manager.user)
+ self.assertEqual(password, password_manager.password)
+
+ r = opener.open(request_url)
+
+ # should have asked the password manager for the username/password
+ self.assertEqual(password_manager.target_realm, realm)
+ self.assertEqual(password_manager.target_url, protected_url)
+
+ # expect one request without authorization, then one with
+ self.assertEqual(len(http_handler.requests), 2)
+ self.assertFalse(http_handler.requests[0].has_header(auth_header))
+ userpass = '%s:%s' % (user, password)
+ auth_hdr_value = 'Basic '+base64.encodestring(userpass).strip()
+ self.assertEqual(http_handler.requests[1].get_header(auth_header),
+ auth_hdr_value)
+
+ # if the password manager can't find a password, the handler won't
+ # handle the HTTP auth error
+ password_manager.user = password_manager.password = None
+ http_handler.reset()
+ r = opener.open(request_url)
+ self.assertEqual(len(http_handler.requests), 1)
+ self.assertFalse(http_handler.requests[0].has_header(auth_header))
+
class MiscTests(unittest.TestCase):
@@ -700,157 +878,15 @@ class MiscTests(unittest.TestCase):
else:
self.assert_(False)
-class NetworkTests(unittest.TestCase):
- def setUp(self):
- if 0: # for debugging
- import logging
- logger = logging.getLogger("test_urllib2")
- logger.addHandler(logging.StreamHandler())
-
- def test_range (self):
- req = urllib2.Request("http://www.python.org",
- headers={'Range': 'bytes=20-39'})
- result = urllib2.urlopen(req)
- data = result.read()
- self.assertEqual(len(data), 20)
-
- # XXX The rest of these tests aren't very good -- they don't check much.
- # They do sometimes catch some major disasters, though.
-
- def test_ftp(self):
- urls = [
- 'ftp://www.python.org/pub/python/misc/sousa.au',
- 'ftp://www.python.org/pub/tmp/blat',
- 'ftp://gatekeeper.research.compaq.com/pub/DEC/SRC'
- '/research-reports/00README-Legal-Rules-Regs',
- ]
- self._test_urls(urls, self._extra_handlers())
-
- def test_gopher(self):
- import warnings
- warnings.filterwarnings("ignore",
- "the gopherlib module is deprecated",
- DeprecationWarning,
- "urllib2$")
- urls = [
- # Thanks to Fred for finding these!
- 'gopher://gopher.lib.ncsu.edu/11/library/stacks/Alex',
- 'gopher://gopher.vt.edu:10010/10/33',
- ]
- self._test_urls(urls, self._extra_handlers())
-
- def test_file(self):
- TESTFN = test_support.TESTFN
- f = open(TESTFN, 'w')
- try:
- f.write('hi there\n')
- f.close()
- urls = [
- 'file:'+sanepathname2url(os.path.abspath(TESTFN)),
-
- # XXX bug, should raise URLError
- #('file://nonsensename/etc/passwd', None, urllib2.URLError)
- ('file://nonsensename/etc/passwd', None, (OSError, socket.error))
- ]
- self._test_urls(urls, self._extra_handlers())
- finally:
- os.remove(TESTFN)
-
- def test_http(self):
- urls = [
- 'http://www.espn.com/', # redirect
- 'http://www.python.org/Spanish/Inquistion/',
- ('http://www.python.org/cgi-bin/faqw.py',
- 'query=pythonistas&querytype=simple&casefold=yes&req=search', None),
- 'http://www.python.org/',
- ]
- self._test_urls(urls, self._extra_handlers())
-
- # XXX Following test depends on machine configurations that are internal
- # to CNRI. Need to set up a public server with the right authentication
- # configuration for test purposes.
-
-## def test_cnri(self):
-## if socket.gethostname() == 'bitdiddle':
-## localhost = 'bitdiddle.cnri.reston.va.us'
-## elif socket.gethostname() == 'bitdiddle.concentric.net':
-## localhost = 'localhost'
-## else:
-## localhost = None
-## if localhost is not None:
-## urls = [
-## 'file://%s/etc/passwd' % localhost,
-## 'http://%s/simple/' % localhost,
-## 'http://%s/digest/' % localhost,
-## 'http://%s/not/found.h' % localhost,
-## ]
-
-## bauth = HTTPBasicAuthHandler()
-## bauth.add_password('basic_test_realm', localhost, 'jhylton',
-## 'password')
-## dauth = HTTPDigestAuthHandler()
-## dauth.add_password('digest_test_realm', localhost, 'jhylton',
-## 'password')
-
-## self._test_urls(urls, self._extra_handlers()+[bauth, dauth])
-
- def _test_urls(self, urls, handlers):
- import socket
- import time
- import logging
- debug = logging.getLogger("test_urllib2").debug
-
- urllib2.install_opener(urllib2.build_opener(*handlers))
-
- for url in urls:
- if isinstance(url, tuple):
- url, req, expected_err = url
- else:
- req = expected_err = None
- debug(url)
- try:
- f = urllib2.urlopen(url, req)
- except (IOError, socket.error, OSError), err:
- debug(err)
- if expected_err:
- self.assert_(isinstance(err, expected_err))
- else:
- buf = f.read()
- f.close()
- debug("read %d bytes" % len(buf))
- debug("******** next url coming up...")
- time.sleep(0.1)
-
- def _extra_handlers(self):
- handlers = []
-
- handlers.append(urllib2.GopherHandler)
-
- cfh = urllib2.CacheFTPHandler()
- cfh.setTimeout(1)
- handlers.append(cfh)
-
-## # XXX try out some custom proxy objects too!
-## def at_cnri(req):
-## host = req.get_host()
-## debug(host)
-## if host[-18:] == '.cnri.reston.va.us':
-## return True
-## p = CustomProxy('http', at_cnri, 'proxy.cnri.reston.va.us')
-## ph = CustomProxyHandler(p)
-## handlers.append(ph)
-
- return handlers
-
def test_main(verbose=None):
+ from test import test_urllib2
+ test_support.run_doctest(test_urllib2, verbose)
test_support.run_doctest(urllib2, verbose)
tests = (TrivialTests,
OpenerDirectorTests,
HandlerTests,
MiscTests)
- if test_support.is_resource_enabled('network'):
- tests += (NetworkTests,)
test_support.run_unittest(*tests)
if __name__ == "__main__":
diff --git a/Lib/test/test_urllib2net.py b/Lib/test/test_urllib2net.py
index 3c23246..dc3d36d 100644
--- a/Lib/test/test_urllib2net.py
+++ b/Lib/test/test_urllib2net.py
@@ -2,6 +2,7 @@
import unittest
from test import test_support
+from test.test_urllib2 import sanepathname2url
import socket
import urllib2
@@ -23,6 +24,46 @@ class URLTimeoutTest(unittest.TestCase):
f = urllib2.urlopen("http://www.python.org/")
x = f.read()
+
+class AuthTests(unittest.TestCase):
+ """Tests urllib2 authentication features."""
+
+## Disabled at the moment since there is no page under python.org which
+## could be used to HTTP authentication.
+#
+# def test_basic_auth(self):
+# import httplib
+#
+# test_url = "http://www.python.org/test/test_urllib2/basic_auth"
+# test_hostport = "www.python.org"
+# test_realm = 'Test Realm'
+# test_user = 'test.test_urllib2net'
+# test_password = 'blah'
+#
+# # failure
+# try:
+# urllib2.urlopen(test_url)
+# except urllib2.HTTPError, exc:
+# self.assertEqual(exc.code, 401)
+# else:
+# self.fail("urlopen() should have failed with 401")
+#
+# # success
+# auth_handler = urllib2.HTTPBasicAuthHandler()
+# auth_handler.add_password(test_realm, test_hostport,
+# test_user, test_password)
+# opener = urllib2.build_opener(auth_handler)
+# f = opener.open('http://localhost/')
+# response = urllib2.urlopen("http://www.python.org/")
+#
+# # The 'userinfo' URL component is deprecated by RFC 3986 for security
+# # reasons, let's not implement it! (it's already implemented for proxy
+# # specification strings (that is, URLs or authorities specifying a
+# # proxy), so we must keep that)
+# self.assertRaises(httplib.InvalidURL,
+# urllib2.urlopen, "http://evil:thing@example.com")
+
+
class urlopenNetworkTests(unittest.TestCase):
"""Tests urllib2.urlopen using the network.
@@ -84,9 +125,145 @@ class urlopenNetworkTests(unittest.TestCase):
# urllib2.urlopen, "http://www.sadflkjsasadf.com/")
urllib2.urlopen, "http://www.python.invalid/")
+
+class OtherNetworkTests(unittest.TestCase):
+ def setUp(self):
+ if 0: # for debugging
+ import logging
+ logger = logging.getLogger("test_urllib2net")
+ logger.addHandler(logging.StreamHandler())
+
+ def test_range (self):
+ req = urllib2.Request("http://www.python.org",
+ headers={'Range': 'bytes=20-39'})
+ result = urllib2.urlopen(req)
+ data = result.read()
+ self.assertEqual(len(data), 20)
+
+ # XXX The rest of these tests aren't very good -- they don't check much.
+ # They do sometimes catch some major disasters, though.
+
+ def test_ftp(self):
+ urls = [
+ 'ftp://www.python.org/pub/python/misc/sousa.au',
+ 'ftp://www.python.org/pub/tmp/blat',
+ 'ftp://gatekeeper.research.compaq.com/pub/DEC/SRC'
+ '/research-reports/00README-Legal-Rules-Regs',
+ ]
+ self._test_urls(urls, self._extra_handlers())
+
+ def test_gopher(self):
+ import warnings
+ warnings.filterwarnings("ignore",
+ "the gopherlib module is deprecated",
+ DeprecationWarning,
+ "urllib2$")
+ urls = [
+ # Thanks to Fred for finding these!
+ 'gopher://gopher.lib.ncsu.edu/11/library/stacks/Alex',
+ 'gopher://gopher.vt.edu:10010/10/33',
+ ]
+ self._test_urls(urls, self._extra_handlers())
+
+ def test_file(self):
+ TESTFN = test_support.TESTFN
+ f = open(TESTFN, 'w')
+ try:
+ f.write('hi there\n')
+ f.close()
+ urls = [
+ 'file:'+sanepathname2url(os.path.abspath(TESTFN)),
+
+ # XXX bug, should raise URLError
+ #('file://nonsensename/etc/passwd', None, urllib2.URLError)
+ ('file://nonsensename/etc/passwd', None, (OSError, socket.error))
+ ]
+ self._test_urls(urls, self._extra_handlers())
+ finally:
+ os.remove(TESTFN)
+
+ def test_http(self):
+ urls = [
+ 'http://www.espn.com/', # redirect
+ 'http://www.python.org/Spanish/Inquistion/',
+ ('http://www.python.org/cgi-bin/faqw.py',
+ 'query=pythonistas&querytype=simple&casefold=yes&req=search', None),
+ 'http://www.python.org/',
+ ]
+ self._test_urls(urls, self._extra_handlers())
+
+ # XXX Following test depends on machine configurations that are internal
+ # to CNRI. Need to set up a public server with the right authentication
+ # configuration for test purposes.
+
+## def test_cnri(self):
+## if socket.gethostname() == 'bitdiddle':
+## localhost = 'bitdiddle.cnri.reston.va.us'
+## elif socket.gethostname() == 'bitdiddle.concentric.net':
+## localhost = 'localhost'
+## else:
+## localhost = None
+## if localhost is not None:
+## urls = [
+## 'file://%s/etc/passwd' % localhost,
+## 'http://%s/simple/' % localhost,
+## 'http://%s/digest/' % localhost,
+## 'http://%s/not/found.h' % localhost,
+## ]
+
+## bauth = HTTPBasicAuthHandler()
+## bauth.add_password('basic_test_realm', localhost, 'jhylton',
+## 'password')
+## dauth = HTTPDigestAuthHandler()
+## dauth.add_password('digest_test_realm', localhost, 'jhylton',
+## 'password')
+
+## self._test_urls(urls, self._extra_handlers()+[bauth, dauth])
+
+ def _test_urls(self, urls, handlers):
+ import socket
+ import time
+ import logging
+ debug = logging.getLogger("test_urllib2").debug
+
+ urllib2.install_opener(urllib2.build_opener(*handlers))
+
+ for url in urls:
+ if isinstance(url, tuple):
+ url, req, expected_err = url
+ else:
+ req = expected_err = None
+ debug(url)
+ try:
+ f = urllib2.urlopen(url, req)
+ except (IOError, socket.error, OSError), err:
+ debug(err)
+ if expected_err:
+ self.assert_(isinstance(err, expected_err))
+ else:
+ buf = f.read()
+ f.close()
+ debug("read %d bytes" % len(buf))
+ debug("******** next url coming up...")
+ time.sleep(0.1)
+
+ def _extra_handlers(self):
+ handlers = []
+
+ handlers.append(urllib2.GopherHandler)
+
+ cfh = urllib2.CacheFTPHandler()
+ cfh.setTimeout(1)
+ handlers.append(cfh)
+
+ return handlers
+
+
+
def test_main():
test_support.requires("network")
- test_support.run_unittest(URLTimeoutTest, urlopenNetworkTests)
+ test_support.run_unittest(URLTimeoutTest, urlopenNetworkTests,
+ AuthTests, OtherNetworkTests)
if __name__ == "__main__":
test_main()
diff --git a/Lib/test/test_weakref.py b/Lib/test/test_weakref.py
index 9634ef2..392e5fa 100644
--- a/Lib/test/test_weakref.py
+++ b/Lib/test/test_weakref.py
@@ -769,10 +769,54 @@ class MappingTestCase(TestBase):
dict, objects = self.make_weak_keyed_dict()
self.check_iters(dict)
+ # Test keyrefs()
+ refs = dict.keyrefs()
+ self.assertEqual(len(refs), len(objects))
+ objects2 = list(objects)
+ for wr in refs:
+ ob = wr()
+ self.assert_(dict.has_key(ob))
+ self.assert_(ob in dict)
+ self.assertEqual(ob.arg, dict[ob])
+ objects2.remove(ob)
+ self.assertEqual(len(objects2), 0)
+
+ # Test iterkeyrefs()
+ objects2 = list(objects)
+ self.assertEqual(len(list(dict.iterkeyrefs())), len(objects))
+ for wr in dict.iterkeyrefs():
+ ob = wr()
+ self.assert_(dict.has_key(ob))
+ self.assert_(ob in dict)
+ self.assertEqual(ob.arg, dict[ob])
+ objects2.remove(ob)
+ self.assertEqual(len(objects2), 0)
+
def test_weak_valued_iters(self):
dict, objects = self.make_weak_valued_dict()
self.check_iters(dict)
+ # Test valuerefs()
+ refs = dict.valuerefs()
+ self.assertEqual(len(refs), len(objects))
+ objects2 = list(objects)
+ for wr in refs:
+ ob = wr()
+ self.assertEqual(ob, dict[ob.arg])
+ self.assertEqual(ob.arg, dict[ob.arg].arg)
+ objects2.remove(ob)
+ self.assertEqual(len(objects2), 0)
+
+ # Test itervaluerefs()
+ objects2 = list(objects)
+ self.assertEqual(len(list(dict.itervaluerefs())), len(objects))
+ for wr in dict.itervaluerefs():
+ ob = wr()
+ self.assertEqual(ob, dict[ob.arg])
+ self.assertEqual(ob.arg, dict[ob.arg].arg)
+ objects2.remove(ob)
+ self.assertEqual(len(objects2), 0)
+
def check_iters(self, dict):
# item iterator:
items = dict.items()
diff --git a/Lib/test/test_with.py b/Lib/test/test_with.py
index 48e00f4..5750508 100644
--- a/Lib/test/test_with.py
+++ b/Lib/test/test_with.py
@@ -17,15 +17,10 @@ from test.test_support import run_unittest
class MockContextManager(GeneratorContextManager):
def __init__(self, gen):
GeneratorContextManager.__init__(self, gen)
- self.context_called = False
self.enter_called = False
self.exit_called = False
self.exit_args = None
- def __context__(self):
- self.context_called = True
- return GeneratorContextManager.__context__(self)
-
def __enter__(self):
self.enter_called = True
return GeneratorContextManager.__enter__(self)
@@ -33,7 +28,8 @@ class MockContextManager(GeneratorContextManager):
def __exit__(self, type, value, traceback):
self.exit_called = True
self.exit_args = (type, value, traceback)
- return GeneratorContextManager.__exit__(self, type, value, traceback)
+ return GeneratorContextManager.__exit__(self, type,
+ value, traceback)
def mock_contextmanager(func):
@@ -60,21 +56,17 @@ def mock_contextmanager_generator():
class Nested(object):
- def __init__(self, *contexts):
- self.contexts = contexts
+ def __init__(self, *managers):
+ self.managers = managers
self.entered = None
- def __context__(self):
- return self
-
def __enter__(self):
if self.entered is not None:
raise RuntimeError("Context is not reentrant")
self.entered = deque()
vars = []
try:
- for context in self.contexts:
- mgr = context.__context__()
+ for mgr in self.managers:
vars.append(mgr.__enter__())
self.entered.appendleft(mgr)
except:
@@ -99,17 +91,12 @@ class Nested(object):
class MockNested(Nested):
- def __init__(self, *contexts):
- Nested.__init__(self, *contexts)
- self.context_called = False
+ def __init__(self, *managers):
+ Nested.__init__(self, *managers)
self.enter_called = False
self.exit_called = False
self.exit_args = None
- def __context__(self):
- self.context_called = True
- return Nested.__context__(self)
-
def __enter__(self):
self.enter_called = True
return Nested.__enter__(self)
@@ -126,24 +113,8 @@ class FailureTestCase(unittest.TestCase):
with foo: pass
self.assertRaises(NameError, fooNotDeclared)
- def testContextAttributeError(self):
- class LacksContext(object):
- def __enter__(self):
- pass
-
- def __exit__(self, type, value, traceback):
- pass
-
- def fooLacksContext():
- foo = LacksContext()
- with foo: pass
- self.assertRaises(AttributeError, fooLacksContext)
-
def testEnterAttributeError(self):
class LacksEnter(object):
- def __context__(self):
- pass
-
def __exit__(self, type, value, traceback):
pass
@@ -154,9 +125,6 @@ class FailureTestCase(unittest.TestCase):
def testExitAttributeError(self):
class LacksExit(object):
- def __context__(self):
- pass
-
def __enter__(self):
pass
@@ -192,27 +160,10 @@ class FailureTestCase(unittest.TestCase):
'with mock as (foo, None, bar):\n'
' pass')
- def testContextThrows(self):
- class ContextThrows(object):
- def __context__(self):
- raise RuntimeError("Context threw")
-
- def shouldThrow():
- ct = ContextThrows()
- self.foo = None
- with ct as self.foo:
- pass
- self.assertRaises(RuntimeError, shouldThrow)
- self.assertEqual(self.foo, None)
-
def testEnterThrows(self):
class EnterThrows(object):
- def __context__(self):
- return self
-
def __enter__(self):
- raise RuntimeError("Context threw")
-
+ raise RuntimeError("Enter threw")
def __exit__(self, *args):
pass
@@ -226,8 +177,6 @@ class FailureTestCase(unittest.TestCase):
def testExitThrows(self):
class ExitThrows(object):
- def __context__(self):
- return self
def __enter__(self):
return
def __exit__(self, *args):
@@ -241,13 +190,11 @@ class ContextmanagerAssertionMixin(object):
TEST_EXCEPTION = RuntimeError("test exception")
def assertInWithManagerInvariants(self, mock_manager):
- self.assertTrue(mock_manager.context_called)
self.assertTrue(mock_manager.enter_called)
self.assertFalse(mock_manager.exit_called)
self.assertEqual(mock_manager.exit_args, None)
def assertAfterWithManagerInvariants(self, mock_manager, exit_args):
- self.assertTrue(mock_manager.context_called)
self.assertTrue(mock_manager.enter_called)
self.assertTrue(mock_manager.exit_called)
self.assertEqual(mock_manager.exit_args, exit_args)
@@ -268,7 +215,6 @@ class ContextmanagerAssertionMixin(object):
raise self.TEST_EXCEPTION
def assertAfterWithManagerInvariantsWithError(self, mock_manager):
- self.assertTrue(mock_manager.context_called)
self.assertTrue(mock_manager.enter_called)
self.assertTrue(mock_manager.exit_called)
self.assertEqual(mock_manager.exit_args[0], RuntimeError)
@@ -472,7 +418,6 @@ class ExceptionalTestCase(unittest.TestCase, ContextmanagerAssertionMixin):
# The inner statement stuff should never have been touched
self.assertEqual(self.bar, None)
- self.assertFalse(mock_b.context_called)
self.assertFalse(mock_b.enter_called)
self.assertFalse(mock_b.exit_called)
self.assertEqual(mock_b.exit_args, None)
@@ -506,13 +451,9 @@ class ExceptionalTestCase(unittest.TestCase, ContextmanagerAssertionMixin):
self.assertRaises(StopIteration, shouldThrow)
def testRaisedStopIteration2(self):
- class cm (object):
- def __context__(self):
- return self
-
+ class cm(object):
def __enter__(self):
pass
-
def __exit__(self, type, value, traceback):
pass
@@ -535,12 +476,8 @@ class ExceptionalTestCase(unittest.TestCase, ContextmanagerAssertionMixin):
def testRaisedGeneratorExit2(self):
class cm (object):
- def __context__(self):
- return self
-
def __enter__(self):
pass
-
def __exit__(self, type, value, traceback):
pass
@@ -629,7 +566,6 @@ class AssignmentTargetTestCase(unittest.TestCase):
def testMultipleComplexTargets(self):
class C:
- def __context__(self): return self
def __enter__(self): return 1, 2, 3
def __exit__(self, t, v, tb): pass
targets = {1: [0, 1, 2]}
@@ -651,7 +587,6 @@ class ExitSwallowsExceptionTestCase(unittest.TestCase):
def testExitTrueSwallowsException(self):
class AfricanSwallow:
- def __context__(self): return self
def __enter__(self): pass
def __exit__(self, t, v, tb): return True
try:
@@ -662,7 +597,6 @@ class ExitSwallowsExceptionTestCase(unittest.TestCase):
def testExitFalseDoesntSwallowException(self):
class EuropeanSwallow:
- def __context__(self): return self
def __enter__(self): pass
def __exit__(self, t, v, tb): return False
try:
diff --git a/Lib/test/test_zlib.py b/Lib/test/test_zlib.py
index 7634680..ccbc8fd 100644
--- a/Lib/test/test_zlib.py
+++ b/Lib/test/test_zlib.py
@@ -302,6 +302,63 @@ class CompressObjectTestCase(unittest.TestCase):
dco = zlib.decompressobj()
self.assertEqual(dco.flush(), "") # Returns nothing
+ def test_compresscopy(self):
+ # Test copying a compression object
+ data0 = HAMLET_SCENE
+ data1 = HAMLET_SCENE.swapcase()
+ c0 = zlib.compressobj(zlib.Z_BEST_COMPRESSION)
+ bufs0 = []
+ bufs0.append(c0.compress(data0))
+
+ c1 = c0.copy()
+ bufs1 = bufs0[:]
+
+ bufs0.append(c0.compress(data0))
+ bufs0.append(c0.flush())
+ s0 = ''.join(bufs0)
+
+ bufs1.append(c1.compress(data1))
+ bufs1.append(c1.flush())
+ s1 = ''.join(bufs1)
+
+ self.assertEqual(zlib.decompress(s0),data0+data0)
+ self.assertEqual(zlib.decompress(s1),data0+data1)
+
+ def test_badcompresscopy(self):
+ # Test copying a compression object in an inconsistent state
+ c = zlib.compressobj()
+ c.compress(HAMLET_SCENE)
+ c.flush()
+ self.assertRaises(ValueError, c.copy)
+
+ def test_decompresscopy(self):
+ # Test copying a decompression object
+ data = HAMLET_SCENE
+ comp = zlib.compress(data)
+
+ d0 = zlib.decompressobj()
+ bufs0 = []
+ bufs0.append(d0.decompress(comp[:32]))
+
+ d1 = d0.copy()
+ bufs1 = bufs0[:]
+
+ bufs0.append(d0.decompress(comp[32:]))
+ s0 = ''.join(bufs0)
+
+ bufs1.append(d1.decompress(comp[32:]))
+ s1 = ''.join(bufs1)
+
+ self.assertEqual(s0,s1)
+ self.assertEqual(s0,data)
+
+ def test_baddecompresscopy(self):
+ # Test copying a compression object in an inconsistent state
+ data = zlib.compress(HAMLET_SCENE)
+ d = zlib.decompressobj()
+ d.decompress(data)
+ d.flush()
+ self.assertRaises(ValueError, d.copy)
def genblock(seed, length, step=1024, generator=random):
"""length-byte stream of random data from a seed (in step-byte blocks)."""
diff --git a/Lib/test/testtar.tar b/Lib/test/testtar.tar
index 8fd0c50..1f4493f 100644
--- a/Lib/test/testtar.tar
+++ b/Lib/test/testtar.tar
Binary files differ
diff --git a/Lib/test/threaded_import_hangers.py b/Lib/test/threaded_import_hangers.py
new file mode 100644
index 0000000..b21c52f
--- /dev/null
+++ b/Lib/test/threaded_import_hangers.py
@@ -0,0 +1,42 @@
+# This is a helper module for test_threaded_import. The test imports this
+# module, and this module tries to run various Python library functions in
+# their own thread, as a side effect of being imported. If the spawned
+# thread doesn't complete in TIMEOUT seconds, an "appeared to hang" message
+# is appended to the module-global `errors` list. That list remains empty
+# if (and only if) all functions tested complete.
+
+TIMEOUT = 10
+
+import threading
+
+import tempfile
+import os.path
+
+errors = []
+
+# This class merely runs a function in its own thread T. The thread importing
+# this module holds the import lock, so if the function called by T tries
+# to do its own imports it will block waiting for this module's import
+# to complete.
+class Worker(threading.Thread):
+ def __init__(self, function, args):
+ threading.Thread.__init__(self)
+ self.function = function
+ self.args = args
+
+ def run(self):
+ self.function(*self.args)
+
+for name, func, args in [
+ # Bug 147376: TemporaryFile hung on Windows, starting in Python 2.4.
+ ("tempfile.TemporaryFile", tempfile.TemporaryFile, ()),
+
+ # The real cause for bug 147376: ntpath.abspath() caused the hang.
+ ("os.path.abspath", os.path.abspath, ('.',)),
+ ]:
+
+ t = Worker(func, args)
+ t.start()
+ t.join(TIMEOUT)
+ if t.isAlive():
+ errors.append("%s appeared to hang" % name)