diff options
Diffstat (limited to 'Lib/test')
61 files changed, 4687 insertions, 553 deletions
diff --git a/Lib/test/exception_hierarchy.txt b/Lib/test/exception_hierarchy.txt index 9ed92d0..58131d7 100644 --- a/Lib/test/exception_hierarchy.txt +++ b/Lib/test/exception_hierarchy.txt @@ -15,6 +15,7 @@ BaseException | | +-- IOError | | +-- OSError | | +-- WindowsError (Windows) + | | +-- VMSError (VMS) | +-- EOFError | +-- ImportError | +-- LookupError @@ -43,4 +44,4 @@ BaseException +-- SyntaxWarning +-- UserWarning +-- FutureWarning - +-- OverflowWarning [not generated by the interpreter] + +-- ImportWarning diff --git a/Lib/test/output/test_logging b/Lib/test/output/test_logging index 7be3a3e..c0d6e06 100644 --- a/Lib/test/output/test_logging +++ b/Lib/test/output/test_logging @@ -488,12 +488,12 @@ INFO:a.b.c.d:Info 5 -- log_test4 begin --------------------------------------------------- config0: ok. config1: ok. -config2: <class 'exceptions.AttributeError'> -config3: <class 'exceptions.KeyError'> +config2: <type 'exceptions.AttributeError'> +config3: <type 'exceptions.KeyError'> -- log_test4 end --------------------------------------------------- -- log_test5 begin --------------------------------------------------- ERROR:root:just testing -<class 'exceptions.KeyError'>... Don't panic! +<type 'exceptions.KeyError'>... Don't panic! -- log_test5 end --------------------------------------------------- -- logrecv output begin --------------------------------------------------- ERR -> CRITICAL: Message 0 (via logrecv.tcp.ERR) diff --git a/Lib/test/pickletester.py b/Lib/test/pickletester.py index 85e1dea..5b9da56 100644 --- a/Lib/test/pickletester.py +++ b/Lib/test/pickletester.py @@ -4,7 +4,8 @@ import cPickle import pickletools import copy_reg -from test.test_support import TestFailed, have_unicode, TESTFN +from test.test_support import TestFailed, have_unicode, TESTFN, \ + run_with_locale # Tests that try a number of pickle protocols should have a # for proto in protocols: @@ -527,6 +528,11 @@ class AbstractPickleTests(unittest.TestCase): got = self.loads(p) self.assertEqual(n, got) + @run_with_locale('LC_ALL', 'de_DE', 'fr_FR') + def test_float_format(self): + # make sure that floats are formatted locale independent + self.assertEqual(self.dumps(1.2)[0:3], 'F1.') + def test_reduce(self): pass diff --git a/Lib/test/regrtest.py b/Lib/test/regrtest.py index 566e54b..86961b0 100755 --- a/Lib/test/regrtest.py +++ b/Lib/test/regrtest.py @@ -25,6 +25,7 @@ Command line options: -N: nocoverdir -- Put coverage files alongside modules -L: runleaks -- run the leaks(1) command just before exit -R: huntrleaks -- search for reference leaks (needs debug build, v. slow) +-M: memlimit -- run very large memory-consuming tests If non-option arguments are present, they are names for tests to run, unless -x is given, in which case they are names for tests not to run. @@ -63,6 +64,19 @@ of times further it is run and 'fname' is the name of the file the reports are written to. These parameters all have defaults (5, 4 and "reflog.txt" respectively), so the minimal invocation is '-R ::'. +-M runs tests that require an exorbitant amount of memory. These tests +typically try to ascertain containers keep working when containing more than +2 bilion objects, and only work on 64-bit systems. The passed-in memlimit, +which is a string in the form of '2.5Gb', determines howmuch memory the +tests will limit themselves to (but they may go slightly over.) The number +shouldn't be more memory than the machine has (including swap memory). You +should also keep in mind that swap memory is generally much, much slower +than RAM, and setting memlimit to all available RAM or higher will heavily +tax the machine. On the other hand, it is no use running these tests with a +limit of less than 2.5Gb, and many require more than 20Gb. Tests that expect +to use more than memlimit memory will be skipped. The big-memory tests +generally run very, very long. + -u is used to specify which special resource intensive tests to run, such as those requiring large file support or network connectivity. The argument is a comma-separated list of words indicating the @@ -124,6 +138,14 @@ if sys.maxint > 0x7fffffff: warnings.filterwarnings("ignore", "hex/oct constants", FutureWarning, "<string>") +# Ignore ImportWarnings that only occur in the source tree, +# (because of modules with the same name as source-directories in Modules/) +for mod in ("ctypes", "gzip", "zipfile", "tarfile", "encodings.zlib_codec", + "test.test_zipimport", "test.test_zlib", "test.test_zipfile", + "test.test_codecs", "test.string_tests"): + warnings.filterwarnings(module=".*%s$" % (mod,), + action="ignore", category=ImportWarning) + # MacOSX (a.k.a. Darwin) has a default stack size that is too small # for deeply recursive regular expressions. We see this as crashes in # the Python test suite when running test_re.py and test_sre.py. The @@ -180,12 +202,12 @@ def main(tests=None, testdir=None, verbose=0, quiet=False, generate=False, test_support.record_original_stdout(sys.stdout) try: - opts, args = getopt.getopt(sys.argv[1:], 'hvgqxsrf:lu:t:TD:NLR:w', + opts, args = getopt.getopt(sys.argv[1:], 'hvgqxsrf:lu:t:TD:NLR:wM:', ['help', 'verbose', 'quiet', 'generate', 'exclude', 'single', 'random', 'fromfile', 'findleaks', 'use=', 'threshold=', 'trace', 'coverdir=', 'nocoverdir', 'runleaks', - 'huntrleaks=', 'verbose2', + 'huntrleaks=', 'verbose2', 'memlimit=', ]) except getopt.error, msg: usage(2, msg) @@ -241,6 +263,8 @@ def main(tests=None, testdir=None, verbose=0, quiet=False, generate=False, huntrleaks[1] = int(huntrleaks[1]) if len(huntrleaks[2]) == 0: huntrleaks[2] = "reflog.txt" + elif o in ('-M', '--memlimit'): + test_support.set_memlimit(a) elif o in ('-u', '--use'): u = [x.lower() for x in a.split(',')] for r in u: @@ -521,6 +545,7 @@ def runtest(test, generate, verbose, quiet, testdir=None, huntrleaks=False): def cleanup(): import _strptime, linecache, warnings, dircache import urlparse, urllib, urllib2, mimetypes, doctest + import struct from distutils.dir_util import _path_created _path_created.clear() warnings.filters[:] = fs @@ -537,6 +562,7 @@ def runtest(test, generate, verbose, quiet, testdir=None, huntrleaks=False): dircache.reset() linecache.clearcache() mimetypes._default_mime_types() + struct._cache.clear() doctest.master = None if indirect_test: def run_the_test(): diff --git a/Lib/test/string_tests.py b/Lib/test/string_tests.py index aab98c2..489af20 100644 --- a/Lib/test/string_tests.py +++ b/Lib/test/string_tests.py @@ -243,29 +243,72 @@ class CommonTest(unittest.TestCase): self.checkequal(['a', 'b', 'c d'], 'a b c d', 'split', None, 2) self.checkequal(['a', 'b', 'c', 'd'], 'a b c d', 'split', None, 3) self.checkequal(['a', 'b', 'c', 'd'], 'a b c d', 'split', None, 4) + self.checkequal(['a', 'b', 'c', 'd'], 'a b c d', 'split', None, + sys.maxint-1) self.checkequal(['a b c d'], 'a b c d', 'split', None, 0) + self.checkequal(['a b c d'], ' a b c d', 'split', None, 0) self.checkequal(['a', 'b', 'c d'], 'a b c d', 'split', None, 2) + self.checkequal([], ' ', 'split') + self.checkequal(['a'], ' a ', 'split') + self.checkequal(['a', 'b'], ' a b ', 'split') + self.checkequal(['a', 'b '], ' a b ', 'split', None, 1) + self.checkequal(['a', 'b c '], ' a b c ', 'split', None, 1) + self.checkequal(['a', 'b', 'c '], ' a b c ', 'split', None, 2) + self.checkequal(['a', 'b'], '\n\ta \t\r b \v ', 'split') + aaa = ' a '*20 + self.checkequal(['a']*20, aaa, 'split') + self.checkequal(['a'] + [aaa[4:]], aaa, 'split', None, 1) + self.checkequal(['a']*19 + ['a '], aaa, 'split', None, 19) + # by a char self.checkequal(['a', 'b', 'c', 'd'], 'a|b|c|d', 'split', '|') + self.checkequal(['a|b|c|d'], 'a|b|c|d', 'split', '|', 0) self.checkequal(['a', 'b|c|d'], 'a|b|c|d', 'split', '|', 1) self.checkequal(['a', 'b', 'c|d'], 'a|b|c|d', 'split', '|', 2) self.checkequal(['a', 'b', 'c', 'd'], 'a|b|c|d', 'split', '|', 3) self.checkequal(['a', 'b', 'c', 'd'], 'a|b|c|d', 'split', '|', 4) + self.checkequal(['a', 'b', 'c', 'd'], 'a|b|c|d', 'split', '|', + sys.maxint-2) self.checkequal(['a|b|c|d'], 'a|b|c|d', 'split', '|', 0) self.checkequal(['a', '', 'b||c||d'], 'a||b||c||d', 'split', '|', 2) self.checkequal(['endcase ', ''], 'endcase |', 'split', '|') + self.checkequal(['', ' startcase'], '| startcase', 'split', '|') + self.checkequal(['', 'bothcase', ''], '|bothcase|', 'split', '|') self.checkequal(['a', '', 'b\x00c\x00d'], 'a\x00\x00b\x00c\x00d', 'split', '\x00', 2) + self.checkequal(['a']*20, ('a|'*20)[:-1], 'split', '|') + self.checkequal(['a']*15 +['a|a|a|a|a'], + ('a|'*20)[:-1], 'split', '|', 15) + # by string self.checkequal(['a', 'b', 'c', 'd'], 'a//b//c//d', 'split', '//') self.checkequal(['a', 'b//c//d'], 'a//b//c//d', 'split', '//', 1) self.checkequal(['a', 'b', 'c//d'], 'a//b//c//d', 'split', '//', 2) self.checkequal(['a', 'b', 'c', 'd'], 'a//b//c//d', 'split', '//', 3) self.checkequal(['a', 'b', 'c', 'd'], 'a//b//c//d', 'split', '//', 4) + self.checkequal(['a', 'b', 'c', 'd'], 'a//b//c//d', 'split', '//', + sys.maxint-10) self.checkequal(['a//b//c//d'], 'a//b//c//d', 'split', '//', 0) self.checkequal(['a', '', 'b////c////d'], 'a////b////c////d', 'split', '//', 2) self.checkequal(['endcase ', ''], 'endcase test', 'split', 'test') + self.checkequal(['', ' begincase'], 'test begincase', 'split', 'test') + self.checkequal(['', ' bothcase ', ''], 'test bothcase test', + 'split', 'test') + self.checkequal(['a', 'bc'], 'abbbc', 'split', 'bb') + self.checkequal(['', ''], 'aaa', 'split', 'aaa') + self.checkequal(['aaa'], 'aaa', 'split', 'aaa', 0) + self.checkequal(['ab', 'ab'], 'abbaab', 'split', 'ba') + self.checkequal(['aaaa'], 'aaaa', 'split', 'aab') + self.checkequal([''], '', 'split', 'aaa') + self.checkequal(['aa'], 'aa', 'split', 'aaa') + self.checkequal(['A', 'bobb'], 'Abbobbbobb', 'split', 'bbobb') + self.checkequal(['A', 'B', ''], 'AbbobbBbbobb', 'split', 'bbobb') + + self.checkequal(['a']*20, ('aBLAH'*20)[:-4], 'split', 'BLAH') + self.checkequal(['a']*20, ('aBLAH'*20)[:-4], 'split', 'BLAH', 19) + self.checkequal(['a']*18 + ['aBLAHa'], ('aBLAH'*20)[:-4], + 'split', 'BLAH', 18) # mixed use of str and unicode self.checkequal([u'a', u'b', u'c d'], 'a b c d', 'split', u' ', 2) @@ -273,6 +316,10 @@ class CommonTest(unittest.TestCase): # argument type self.checkraises(TypeError, 'hello', 'split', 42, 42, 42) + # null case + self.checkraises(ValueError, 'hello', 'split', '') + self.checkraises(ValueError, 'hello', 'split', '', 0) + def test_rsplit(self): self.checkequal(['this', 'is', 'the', 'rsplit', 'function'], 'this is the rsplit function', 'rsplit') @@ -283,29 +330,75 @@ class CommonTest(unittest.TestCase): self.checkequal(['a b', 'c', 'd'], 'a b c d', 'rsplit', None, 2) self.checkequal(['a', 'b', 'c', 'd'], 'a b c d', 'rsplit', None, 3) self.checkequal(['a', 'b', 'c', 'd'], 'a b c d', 'rsplit', None, 4) + self.checkequal(['a', 'b', 'c', 'd'], 'a b c d', 'rsplit', None, + sys.maxint-20) self.checkequal(['a b c d'], 'a b c d', 'rsplit', None, 0) + self.checkequal(['a b c d'], 'a b c d ', 'rsplit', None, 0) self.checkequal(['a b', 'c', 'd'], 'a b c d', 'rsplit', None, 2) + self.checkequal([], ' ', 'rsplit') + self.checkequal(['a'], ' a ', 'rsplit') + self.checkequal(['a', 'b'], ' a b ', 'rsplit') + self.checkequal([' a', 'b'], ' a b ', 'rsplit', None, 1) + self.checkequal([' a b','c'], ' a b c ', 'rsplit', + None, 1) + self.checkequal([' a', 'b', 'c'], ' a b c ', 'rsplit', + None, 2) + self.checkequal(['a', 'b'], '\n\ta \t\r b \v ', 'rsplit', None, 88) + aaa = ' a '*20 + self.checkequal(['a']*20, aaa, 'rsplit') + self.checkequal([aaa[:-4]] + ['a'], aaa, 'rsplit', None, 1) + self.checkequal([' a a'] + ['a']*18, aaa, 'rsplit', None, 18) + + # by a char self.checkequal(['a', 'b', 'c', 'd'], 'a|b|c|d', 'rsplit', '|') self.checkequal(['a|b|c', 'd'], 'a|b|c|d', 'rsplit', '|', 1) self.checkequal(['a|b', 'c', 'd'], 'a|b|c|d', 'rsplit', '|', 2) self.checkequal(['a', 'b', 'c', 'd'], 'a|b|c|d', 'rsplit', '|', 3) self.checkequal(['a', 'b', 'c', 'd'], 'a|b|c|d', 'rsplit', '|', 4) + self.checkequal(['a', 'b', 'c', 'd'], 'a|b|c|d', 'rsplit', '|', + sys.maxint-100) self.checkequal(['a|b|c|d'], 'a|b|c|d', 'rsplit', '|', 0) self.checkequal(['a||b||c', '', 'd'], 'a||b||c||d', 'rsplit', '|', 2) self.checkequal(['', ' begincase'], '| begincase', 'rsplit', '|') + self.checkequal(['endcase ', ''], 'endcase |', 'rsplit', '|') + self.checkequal(['', 'bothcase', ''], '|bothcase|', 'rsplit', '|') + self.checkequal(['a\x00\x00b', 'c', 'd'], 'a\x00\x00b\x00c\x00d', 'rsplit', '\x00', 2) + self.checkequal(['a']*20, ('a|'*20)[:-1], 'rsplit', '|') + self.checkequal(['a|a|a|a|a']+['a']*15, + ('a|'*20)[:-1], 'rsplit', '|', 15) + # by string self.checkequal(['a', 'b', 'c', 'd'], 'a//b//c//d', 'rsplit', '//') self.checkequal(['a//b//c', 'd'], 'a//b//c//d', 'rsplit', '//', 1) self.checkequal(['a//b', 'c', 'd'], 'a//b//c//d', 'rsplit', '//', 2) self.checkequal(['a', 'b', 'c', 'd'], 'a//b//c//d', 'rsplit', '//', 3) self.checkequal(['a', 'b', 'c', 'd'], 'a//b//c//d', 'rsplit', '//', 4) + self.checkequal(['a', 'b', 'c', 'd'], 'a//b//c//d', 'rsplit', '//', + sys.maxint-5) self.checkequal(['a//b//c//d'], 'a//b//c//d', 'rsplit', '//', 0) self.checkequal(['a////b////c', '', 'd'], 'a////b////c////d', 'rsplit', '//', 2) self.checkequal(['', ' begincase'], 'test begincase', 'rsplit', 'test') + self.checkequal(['endcase ', ''], 'endcase test', 'rsplit', 'test') + self.checkequal(['', ' bothcase ', ''], 'test bothcase test', + 'rsplit', 'test') + self.checkequal(['ab', 'c'], 'abbbc', 'rsplit', 'bb') + self.checkequal(['', ''], 'aaa', 'rsplit', 'aaa') + self.checkequal(['aaa'], 'aaa', 'rsplit', 'aaa', 0) + self.checkequal(['ab', 'ab'], 'abbaab', 'rsplit', 'ba') + self.checkequal(['aaaa'], 'aaaa', 'rsplit', 'aab') + self.checkequal([''], '', 'rsplit', 'aaa') + self.checkequal(['aa'], 'aa', 'rsplit', 'aaa') + self.checkequal(['bbob', 'A'], 'bbobbbobbA', 'rsplit', 'bbobb') + self.checkequal(['', 'B', 'A'], 'bbobbBbbobbA', 'rsplit', 'bbobb') + + self.checkequal(['a']*20, ('aBLAH'*20)[:-4], 'rsplit', 'BLAH') + self.checkequal(['a']*20, ('aBLAH'*20)[:-4], 'rsplit', 'BLAH', 19) + self.checkequal(['aBLAHa'] + ['a']*18, ('aBLAH'*20)[:-4], + 'rsplit', 'BLAH', 18) # mixed use of str and unicode self.checkequal([u'a b', u'c', u'd'], 'a b c d', 'rsplit', u' ', 2) @@ -313,6 +406,10 @@ class CommonTest(unittest.TestCase): # argument type self.checkraises(TypeError, 'hello', 'rsplit', 42, 42, 42) + # null case + self.checkraises(ValueError, 'hello', 'rsplit', '') + self.checkraises(ValueError, 'hello', 'rsplit', '', 0) + def test_strip(self): self.checkequal('hello', ' hello ', 'strip') self.checkequal('hello ', ' hello ', 'lstrip') @@ -376,6 +473,158 @@ class CommonTest(unittest.TestCase): self.checkraises(TypeError, 'hello', 'swapcase', 42) def test_replace(self): + EQ = self.checkequal + + # Operations on the empty string + EQ("", "", "replace", "", "") + + #EQ("A", "", "replace", "", "A") + # That was the correct result; this is the result we actually get + # now (for str, but not for unicode): + #EQ("", "", "replace", "", "A") + + EQ("", "", "replace", "A", "") + EQ("", "", "replace", "A", "A") + EQ("", "", "replace", "", "", 100) + EQ("", "", "replace", "", "", sys.maxint) + + # interleave (from=="", 'to' gets inserted everywhere) + EQ("A", "A", "replace", "", "") + EQ("*A*", "A", "replace", "", "*") + EQ("*1A*1", "A", "replace", "", "*1") + EQ("*-#A*-#", "A", "replace", "", "*-#") + EQ("*-A*-A*-", "AA", "replace", "", "*-") + EQ("*-A*-A*-", "AA", "replace", "", "*-", -1) + EQ("*-A*-A*-", "AA", "replace", "", "*-", sys.maxint) + EQ("*-A*-A*-", "AA", "replace", "", "*-", 4) + EQ("*-A*-A*-", "AA", "replace", "", "*-", 3) + EQ("*-A*-A", "AA", "replace", "", "*-", 2) + EQ("*-AA", "AA", "replace", "", "*-", 1) + EQ("AA", "AA", "replace", "", "*-", 0) + + # single character deletion (from=="A", to=="") + EQ("", "A", "replace", "A", "") + EQ("", "AAA", "replace", "A", "") + EQ("", "AAA", "replace", "A", "", -1) + EQ("", "AAA", "replace", "A", "", sys.maxint) + EQ("", "AAA", "replace", "A", "", 4) + EQ("", "AAA", "replace", "A", "", 3) + EQ("A", "AAA", "replace", "A", "", 2) + EQ("AA", "AAA", "replace", "A", "", 1) + EQ("AAA", "AAA", "replace", "A", "", 0) + EQ("", "AAAAAAAAAA", "replace", "A", "") + EQ("BCD", "ABACADA", "replace", "A", "") + EQ("BCD", "ABACADA", "replace", "A", "", -1) + EQ("BCD", "ABACADA", "replace", "A", "", sys.maxint) + EQ("BCD", "ABACADA", "replace", "A", "", 5) + EQ("BCD", "ABACADA", "replace", "A", "", 4) + EQ("BCDA", "ABACADA", "replace", "A", "", 3) + EQ("BCADA", "ABACADA", "replace", "A", "", 2) + EQ("BACADA", "ABACADA", "replace", "A", "", 1) + EQ("ABACADA", "ABACADA", "replace", "A", "", 0) + EQ("BCD", "ABCAD", "replace", "A", "") + EQ("BCD", "ABCADAA", "replace", "A", "") + EQ("BCD", "BCD", "replace", "A", "") + EQ("*************", "*************", "replace", "A", "") + EQ("^A^", "^"+"A"*1000+"^", "replace", "A", "", 999) + + # substring deletion (from=="the", to=="") + EQ("", "the", "replace", "the", "") + EQ("ater", "theater", "replace", "the", "") + EQ("", "thethe", "replace", "the", "") + EQ("", "thethethethe", "replace", "the", "") + EQ("aaaa", "theatheatheathea", "replace", "the", "") + EQ("that", "that", "replace", "the", "") + EQ("thaet", "thaet", "replace", "the", "") + EQ("here and re", "here and there", "replace", "the", "") + EQ("here and re and re", "here and there and there", + "replace", "the", "", sys.maxint) + EQ("here and re and re", "here and there and there", + "replace", "the", "", -1) + EQ("here and re and re", "here and there and there", + "replace", "the", "", 3) + EQ("here and re and re", "here and there and there", + "replace", "the", "", 2) + EQ("here and re and there", "here and there and there", + "replace", "the", "", 1) + EQ("here and there and there", "here and there and there", + "replace", "the", "", 0) + EQ("here and re and re", "here and there and there", "replace", "the", "") + + EQ("abc", "abc", "replace", "the", "") + EQ("abcdefg", "abcdefg", "replace", "the", "") + + # substring deletion (from=="bob", to=="") + EQ("bob", "bbobob", "replace", "bob", "") + EQ("bobXbob", "bbobobXbbobob", "replace", "bob", "") + EQ("aaaaaaa", "aaaaaaabob", "replace", "bob", "") + EQ("aaaaaaa", "aaaaaaa", "replace", "bob", "") + + # single character replace in place (len(from)==len(to)==1) + EQ("Who goes there?", "Who goes there?", "replace", "o", "o") + EQ("WhO gOes there?", "Who goes there?", "replace", "o", "O") + EQ("WhO gOes there?", "Who goes there?", "replace", "o", "O", sys.maxint) + EQ("WhO gOes there?", "Who goes there?", "replace", "o", "O", -1) + EQ("WhO gOes there?", "Who goes there?", "replace", "o", "O", 3) + EQ("WhO gOes there?", "Who goes there?", "replace", "o", "O", 2) + EQ("WhO goes there?", "Who goes there?", "replace", "o", "O", 1) + EQ("Who goes there?", "Who goes there?", "replace", "o", "O", 0) + + EQ("Who goes there?", "Who goes there?", "replace", "a", "q") + EQ("who goes there?", "Who goes there?", "replace", "W", "w") + EQ("wwho goes there?ww", "WWho goes there?WW", "replace", "W", "w") + EQ("Who goes there!", "Who goes there?", "replace", "?", "!") + EQ("Who goes there!!", "Who goes there??", "replace", "?", "!") + + EQ("Who goes there?", "Who goes there?", "replace", ".", "!") + + # substring replace in place (len(from)==len(to) > 1) + EQ("Th** ** a t**sue", "This is a tissue", "replace", "is", "**") + EQ("Th** ** a t**sue", "This is a tissue", "replace", "is", "**", sys.maxint) + EQ("Th** ** a t**sue", "This is a tissue", "replace", "is", "**", -1) + EQ("Th** ** a t**sue", "This is a tissue", "replace", "is", "**", 4) + EQ("Th** ** a t**sue", "This is a tissue", "replace", "is", "**", 3) + EQ("Th** ** a tissue", "This is a tissue", "replace", "is", "**", 2) + EQ("Th** is a tissue", "This is a tissue", "replace", "is", "**", 1) + EQ("This is a tissue", "This is a tissue", "replace", "is", "**", 0) + EQ("cobob", "bobob", "replace", "bob", "cob") + EQ("cobobXcobocob", "bobobXbobobob", "replace", "bob", "cob") + EQ("bobob", "bobob", "replace", "bot", "bot") + + # replace single character (len(from)==1, len(to)>1) + EQ("ReyKKjaviKK", "Reykjavik", "replace", "k", "KK") + EQ("ReyKKjaviKK", "Reykjavik", "replace", "k", "KK", -1) + EQ("ReyKKjaviKK", "Reykjavik", "replace", "k", "KK", sys.maxint) + EQ("ReyKKjaviKK", "Reykjavik", "replace", "k", "KK", 2) + EQ("ReyKKjavik", "Reykjavik", "replace", "k", "KK", 1) + EQ("Reykjavik", "Reykjavik", "replace", "k", "KK", 0) + EQ("A----B----C----", "A.B.C.", "replace", ".", "----") + + EQ("Reykjavik", "Reykjavik", "replace", "q", "KK") + + # replace substring (len(from)>1, len(to)!=len(from)) + EQ("ham, ham, eggs and ham", "spam, spam, eggs and spam", + "replace", "spam", "ham") + EQ("ham, ham, eggs and ham", "spam, spam, eggs and spam", + "replace", "spam", "ham", sys.maxint) + EQ("ham, ham, eggs and ham", "spam, spam, eggs and spam", + "replace", "spam", "ham", -1) + EQ("ham, ham, eggs and ham", "spam, spam, eggs and spam", + "replace", "spam", "ham", 4) + EQ("ham, ham, eggs and ham", "spam, spam, eggs and spam", + "replace", "spam", "ham", 3) + EQ("ham, ham, eggs and spam", "spam, spam, eggs and spam", + "replace", "spam", "ham", 2) + EQ("ham, spam, eggs and spam", "spam, spam, eggs and spam", + "replace", "spam", "ham", 1) + EQ("spam, spam, eggs and spam", "spam, spam, eggs and spam", + "replace", "spam", "ham", 0) + + EQ("bobob", "bobobob", "replace", "bobob", "bob") + EQ("bobobXbobob", "bobobobXbobobob", "replace", "bobob", "bob") + EQ("BOBOBOB", "BOBOBOB", "replace", "bob", "bobby") + + # self.checkequal('one@two!three!', 'one!two!three!', 'replace', '!', '@', 1) self.checkequal('onetwothree', 'one!two!three!', 'replace', '!', '') self.checkequal('one@two@three!', 'one!two!three!', 'replace', '!', '@', 2) @@ -403,6 +652,15 @@ class CommonTest(unittest.TestCase): self.checkraises(TypeError, 'hello', 'replace', 42, 'h') self.checkraises(TypeError, 'hello', 'replace', 'h', 42) + def test_replace_overflow(self): + # Check for overflow checking on 32 bit machines + if sys.maxint != 2147483647: + return + A2_16 = "A" * (2**16) + self.checkraises(OverflowError, A2_16, "replace", "", A2_16) + self.checkraises(OverflowError, A2_16, "replace", "A", A2_16) + self.checkraises(OverflowError, A2_16, "replace", "AA", A2_16+A2_16) + def test_zfill(self): self.checkequal('123', '123', 'zfill', 2) self.checkequal('123', '123', 'zfill', 3) @@ -720,6 +978,55 @@ class MixinStrUnicodeUserStringTest: else: self.checkcall(format, "__mod__", value) + def test_inplace_rewrites(self): + # Check that strings don't copy and modify cached single-character strings + self.checkequal('a', 'A', 'lower') + self.checkequal(True, 'A', 'isupper') + self.checkequal('A', 'a', 'upper') + self.checkequal(True, 'a', 'islower') + + self.checkequal('a', 'A', 'replace', 'A', 'a') + self.checkequal(True, 'A', 'isupper') + + self.checkequal('A', 'a', 'capitalize') + self.checkequal(True, 'a', 'islower') + + self.checkequal('A', 'a', 'swapcase') + self.checkequal(True, 'a', 'islower') + + self.checkequal('A', 'a', 'title') + self.checkequal(True, 'a', 'islower') + + def test_partition(self): + + self.checkequal(('this is the par', 'ti', 'tion method'), + 'this is the partition method', 'partition', 'ti') + + # from raymond's original specification + S = 'http://www.python.org' + self.checkequal(('http', '://', 'www.python.org'), S, 'partition', '://') + self.checkequal(('http://www.python.org', '', ''), S, 'partition', '?') + self.checkequal(('', 'http://', 'www.python.org'), S, 'partition', 'http://') + self.checkequal(('http://www.python.', 'org', ''), S, 'partition', 'org') + + self.checkraises(ValueError, S, 'partition', '') + self.checkraises(TypeError, S, 'partition', None) + + def test_rpartition(self): + + self.checkequal(('this is the rparti', 'ti', 'on method'), + 'this is the rpartition method', 'rpartition', 'ti') + + # from raymond's original specification + S = 'http://www.python.org' + self.checkequal(('http', '://', 'www.python.org'), S, 'rpartition', '://') + self.checkequal(('http://www.python.org', '', ''), S, 'rpartition', '?') + self.checkequal(('', 'http://', 'www.python.org'), S, 'rpartition', 'http://') + self.checkequal(('http://www.python.', 'org', ''), S, 'rpartition', 'org') + + self.checkraises(ValueError, S, 'rpartition', '') + self.checkraises(TypeError, S, 'rpartition', None) + class MixinStrStringUserStringTest: # Additional tests for 8bit strings, i.e. str, UserString and diff --git a/Lib/test/test_bigmem.py b/Lib/test/test_bigmem.py new file mode 100644 index 0000000..255428f --- /dev/null +++ b/Lib/test/test_bigmem.py @@ -0,0 +1,964 @@ +from test import test_support +from test.test_support import bigmemtest, _1G, _2G + +import unittest +import operator +import string +import sys + +# Bigmem testing houserules: +# +# - Try not to allocate too many large objects. It's okay to rely on +# refcounting semantics, but don't forget that 's = create_largestring()' +# doesn't release the old 's' (if it exists) until well after its new +# value has been created. Use 'del s' before the create_largestring call. +# +# - Do *not* compare large objects using assertEquals or similar. It's a +# lengty operation and the errormessage will be utterly useless due to +# its size. To make sure whether a result has the right contents, better +# to use the strip or count methods, or compare meaningful slices. +# +# - Don't forget to test for large indices, offsets and results and such, +# in addition to large sizes. +# +# - When repeating an object (say, a substring, or a small list) to create +# a large object, make the subobject of a length that is not a power of +# 2. That way, int-wrapping problems are more easily detected. +# +# - While the bigmemtest decorator speaks of 'minsize', all tests will +# actually be called with a much smaller number too, in the normal +# test run (5Kb currently.) This is so the tests themselves get frequent +# testing Consequently, always make all large allocations based on the +# passed-in 'size', and don't rely on the size being very large. Also, +# memuse-per-size should remain sane (less than a few thousand); if your +# test uses more, adjust 'size' upward, instead. + +class StrTest(unittest.TestCase): + @bigmemtest(minsize=_2G, memuse=2) + def test_capitalize(self, size): + SUBSTR = ' abc def ghi' + s = '-' * size + SUBSTR + caps = s.capitalize() + self.assertEquals(caps[-len(SUBSTR):], + SUBSTR.capitalize()) + self.assertEquals(caps.lstrip('-'), SUBSTR) + + @bigmemtest(minsize=_2G + 10, memuse=1) + def test_center(self, size): + SUBSTR = ' abc def ghi' + s = SUBSTR.center(size) + self.assertEquals(len(s), size) + lpadsize = rpadsize = (len(s) - len(SUBSTR)) // 2 + if len(s) % 2: + lpadsize += 1 + self.assertEquals(s[lpadsize:-rpadsize], SUBSTR) + self.assertEquals(s.strip(), SUBSTR.strip()) + + @bigmemtest(minsize=_2G, memuse=2) + def test_count(self, size): + SUBSTR = ' abc def ghi' + s = '.' * size + SUBSTR + self.assertEquals(s.count('.'), size) + s += '.' + self.assertEquals(s.count('.'), size + 1) + self.assertEquals(s.count(' '), 3) + self.assertEquals(s.count('i'), 1) + self.assertEquals(s.count('j'), 0) + + @bigmemtest(minsize=0, memuse=1) + def test_decode(self, size): + pass + + @bigmemtest(minsize=0, memuse=1) + def test_encode(self, size): + pass + + @bigmemtest(minsize=_2G, memuse=2) + def test_endswith(self, size): + SUBSTR = ' abc def ghi' + s = '-' * size + SUBSTR + self.failUnless(s.endswith(SUBSTR)) + self.failUnless(s.endswith(s)) + s2 = '...' + s + self.failUnless(s2.endswith(s)) + self.failIf(s.endswith('a' + SUBSTR)) + self.failIf(SUBSTR.endswith(s)) + + @bigmemtest(minsize=_2G + 10, memuse=2) + def test_expandtabs(self, size): + s = '-' * size + tabsize = 8 + self.assertEquals(s.expandtabs(), s) + del s + slen, remainder = divmod(size, tabsize) + s = ' \t' * slen + s = s.expandtabs(tabsize) + self.assertEquals(len(s), size - remainder) + self.assertEquals(len(s.strip(' ')), 0) + + @bigmemtest(minsize=_2G, memuse=2) + def test_find(self, size): + SUBSTR = ' abc def ghi' + sublen = len(SUBSTR) + s = ''.join([SUBSTR, '-' * size, SUBSTR]) + self.assertEquals(s.find(' '), 0) + self.assertEquals(s.find(SUBSTR), 0) + self.assertEquals(s.find(' ', sublen), sublen + size) + self.assertEquals(s.find(SUBSTR, len(SUBSTR)), sublen + size) + self.assertEquals(s.find('i'), SUBSTR.find('i')) + self.assertEquals(s.find('i', sublen), + sublen + size + SUBSTR.find('i')) + self.assertEquals(s.find('i', size), + sublen + size + SUBSTR.find('i')) + self.assertEquals(s.find('j'), -1) + + @bigmemtest(minsize=_2G, memuse=2) + def test_index(self, size): + SUBSTR = ' abc def ghi' + sublen = len(SUBSTR) + s = ''.join([SUBSTR, '-' * size, SUBSTR]) + self.assertEquals(s.index(' '), 0) + self.assertEquals(s.index(SUBSTR), 0) + self.assertEquals(s.index(' ', sublen), sublen + size) + self.assertEquals(s.index(SUBSTR, sublen), sublen + size) + self.assertEquals(s.index('i'), SUBSTR.index('i')) + self.assertEquals(s.index('i', sublen), + sublen + size + SUBSTR.index('i')) + self.assertEquals(s.index('i', size), + sublen + size + SUBSTR.index('i')) + self.assertRaises(ValueError, s.index, 'j') + + @bigmemtest(minsize=_2G, memuse=2) + def test_isalnum(self, size): + SUBSTR = '123456' + s = 'a' * size + SUBSTR + self.failUnless(s.isalnum()) + s += '.' + self.failIf(s.isalnum()) + + @bigmemtest(minsize=_2G, memuse=2) + def test_isalpha(self, size): + SUBSTR = 'zzzzzzz' + s = 'a' * size + SUBSTR + self.failUnless(s.isalpha()) + s += '.' + self.failIf(s.isalpha()) + + @bigmemtest(minsize=_2G, memuse=2) + def test_isdigit(self, size): + SUBSTR = '123456' + s = '9' * size + SUBSTR + self.failUnless(s.isdigit()) + s += 'z' + self.failIf(s.isdigit()) + + @bigmemtest(minsize=_2G, memuse=2) + def test_islower(self, size): + chars = ''.join([ chr(c) for c in range(255) if not chr(c).isupper() ]) + repeats = size // len(chars) + 2 + s = chars * repeats + self.failUnless(s.islower()) + s += 'A' + self.failIf(s.islower()) + + @bigmemtest(minsize=_2G, memuse=2) + def test_isspace(self, size): + whitespace = ' \f\n\r\t\v' + repeats = size // len(whitespace) + 2 + s = whitespace * repeats + self.failUnless(s.isspace()) + s += 'j' + self.failIf(s.isspace()) + + @bigmemtest(minsize=_2G, memuse=2) + def test_istitle(self, size): + SUBSTR = '123456' + s = ''.join(['A', 'a' * size, SUBSTR]) + self.failUnless(s.istitle()) + s += 'A' + self.failUnless(s.istitle()) + s += 'aA' + self.failIf(s.istitle()) + + @bigmemtest(minsize=_2G, memuse=2) + def test_isupper(self, size): + chars = ''.join([ chr(c) for c in range(255) if not chr(c).islower() ]) + repeats = size // len(chars) + 2 + s = chars * repeats + self.failUnless(s.isupper()) + s += 'a' + self.failIf(s.isupper()) + + @bigmemtest(minsize=_2G, memuse=2) + def test_join(self, size): + s = 'A' * size + x = s.join(['aaaaa', 'bbbbb']) + self.assertEquals(x.count('a'), 5) + self.assertEquals(x.count('b'), 5) + self.failUnless(x.startswith('aaaaaA')) + self.failUnless(x.endswith('Abbbbb')) + + @bigmemtest(minsize=_2G + 10, memuse=1) + def test_ljust(self, size): + SUBSTR = ' abc def ghi' + s = SUBSTR.ljust(size) + self.failUnless(s.startswith(SUBSTR + ' ')) + self.assertEquals(len(s), size) + self.assertEquals(s.strip(), SUBSTR.strip()) + + @bigmemtest(minsize=_2G + 10, memuse=2) + def test_lower(self, size): + s = 'A' * size + s = s.lower() + self.assertEquals(len(s), size) + self.assertEquals(s.count('a'), size) + + @bigmemtest(minsize=_2G + 10, memuse=1) + def test_lstrip(self, size): + SUBSTR = 'abc def ghi' + s = SUBSTR.rjust(size) + self.assertEquals(len(s), size) + self.assertEquals(s.lstrip(), SUBSTR.lstrip()) + del s + s = SUBSTR.ljust(size) + self.assertEquals(len(s), size) + stripped = s.lstrip() + self.failUnless(stripped is s) + + @bigmemtest(minsize=_2G + 10, memuse=2) + def test_replace(self, size): + replacement = 'a' + s = ' ' * size + s = s.replace(' ', replacement) + self.assertEquals(len(s), size) + self.assertEquals(s.count(replacement), size) + s = s.replace(replacement, ' ', size - 4) + self.assertEquals(len(s), size) + self.assertEquals(s.count(replacement), 4) + self.assertEquals(s[-10:], ' aaaa') + + @bigmemtest(minsize=_2G, memuse=2) + def test_rfind(self, size): + SUBSTR = ' abc def ghi' + sublen = len(SUBSTR) + s = ''.join([SUBSTR, '-' * size, SUBSTR]) + self.assertEquals(s.rfind(' '), sublen + size + SUBSTR.rfind(' ')) + self.assertEquals(s.rfind(SUBSTR), sublen + size) + self.assertEquals(s.rfind(' ', 0, size), SUBSTR.rfind(' ')) + self.assertEquals(s.rfind(SUBSTR, 0, sublen + size), 0) + self.assertEquals(s.rfind('i'), sublen + size + SUBSTR.rfind('i')) + self.assertEquals(s.rfind('i', 0, sublen), SUBSTR.rfind('i')) + self.assertEquals(s.rfind('i', 0, sublen + size), + SUBSTR.rfind('i')) + self.assertEquals(s.rfind('j'), -1) + + @bigmemtest(minsize=_2G, memuse=2) + def test_rindex(self, size): + SUBSTR = ' abc def ghi' + sublen = len(SUBSTR) + s = ''.join([SUBSTR, '-' * size, SUBSTR]) + self.assertEquals(s.rindex(' '), + sublen + size + SUBSTR.rindex(' ')) + self.assertEquals(s.rindex(SUBSTR), sublen + size) + self.assertEquals(s.rindex(' ', 0, sublen + size - 1), + SUBSTR.rindex(' ')) + self.assertEquals(s.rindex(SUBSTR, 0, sublen + size), 0) + self.assertEquals(s.rindex('i'), + sublen + size + SUBSTR.rindex('i')) + self.assertEquals(s.rindex('i', 0, sublen), SUBSTR.rindex('i')) + self.assertEquals(s.rindex('i', 0, sublen + size), + SUBSTR.rindex('i')) + self.assertRaises(ValueError, s.rindex, 'j') + + @bigmemtest(minsize=_2G + 10, memuse=1) + def test_rjust(self, size): + SUBSTR = ' abc def ghi' + s = SUBSTR.ljust(size) + self.failUnless(s.startswith(SUBSTR + ' ')) + self.assertEquals(len(s), size) + self.assertEquals(s.strip(), SUBSTR.strip()) + + @bigmemtest(minsize=_2G + 10, memuse=1) + def test_rstrip(self, size): + SUBSTR = ' abc def ghi' + s = SUBSTR.ljust(size) + self.assertEquals(len(s), size) + self.assertEquals(s.rstrip(), SUBSTR.rstrip()) + del s + s = SUBSTR.rjust(size) + self.assertEquals(len(s), size) + stripped = s.rstrip() + self.failUnless(stripped is s) + + # The test takes about size bytes to build a string, and then about + # sqrt(size) substrings of sqrt(size) in size and a list to + # hold sqrt(size) items. It's close but just over 2x size. + @bigmemtest(minsize=_2G, memuse=2.1) + def test_split_small(self, size): + # Crudely calculate an estimate so that the result of s.split won't + # take up an inordinate amount of memory + chunksize = int(size ** 0.5 + 2) + SUBSTR = 'a' + ' ' * chunksize + s = SUBSTR * chunksize + l = s.split() + self.assertEquals(len(l), chunksize) + self.assertEquals(set(l), set(['a'])) + del l + l = s.split('a') + self.assertEquals(len(l), chunksize + 1) + self.assertEquals(set(l), set(['', ' ' * chunksize])) + + # Allocates a string of twice size (and briefly two) and a list of + # size. Because of internal affairs, the s.split() call produces a + # list of size times the same one-character string, so we only + # suffer for the list size. (Otherwise, it'd cost another 48 times + # size in bytes!) Nevertheless, a list of size takes + # 8*size bytes. + @bigmemtest(minsize=_2G + 5, memuse=10) + def test_split_large(self, size): + s = ' a' * size + ' ' + l = s.split() + self.assertEquals(len(l), size) + self.assertEquals(set(l), set(['a'])) + del l + l = s.split('a') + self.assertEquals(len(l), size + 1) + self.assertEquals(set(l), set([' '])) + + @bigmemtest(minsize=_2G, memuse=2.1) + def test_splitlines(self, size): + # Crudely calculate an estimate so that the result of s.split won't + # take up an inordinate amount of memory + chunksize = int(size ** 0.5 + 2) // 2 + SUBSTR = ' ' * chunksize + '\n' + ' ' * chunksize + '\r\n' + s = SUBSTR * chunksize + l = s.splitlines() + self.assertEquals(len(l), chunksize * 2) + self.assertEquals(set(l), set([' ' * chunksize])) + + @bigmemtest(minsize=_2G, memuse=2) + def test_startswith(self, size): + SUBSTR = ' abc def ghi' + s = '-' * size + SUBSTR + self.failUnless(s.startswith(s)) + self.failUnless(s.startswith('-' * size)) + self.failIf(s.startswith(SUBSTR)) + + @bigmemtest(minsize=_2G, memuse=1) + def test_strip(self, size): + SUBSTR = ' abc def ghi ' + s = SUBSTR.rjust(size) + self.assertEquals(len(s), size) + self.assertEquals(s.strip(), SUBSTR.strip()) + del s + s = SUBSTR.ljust(size) + self.assertEquals(len(s), size) + self.assertEquals(s.strip(), SUBSTR.strip()) + + @bigmemtest(minsize=_2G, memuse=2) + def test_swapcase(self, size): + SUBSTR = "aBcDeFG12.'\xa9\x00" + sublen = len(SUBSTR) + repeats = size // sublen + 2 + s = SUBSTR * repeats + s = s.swapcase() + self.assertEquals(len(s), sublen * repeats) + self.assertEquals(s[:sublen * 3], SUBSTR.swapcase() * 3) + self.assertEquals(s[-sublen * 3:], SUBSTR.swapcase() * 3) + + @bigmemtest(minsize=_2G, memuse=2) + def test_title(self, size): + SUBSTR = 'SpaaHAaaAaham' + s = SUBSTR * (size // len(SUBSTR) + 2) + s = s.title() + self.failUnless(s.startswith((SUBSTR * 3).title())) + self.failUnless(s.endswith(SUBSTR.lower() * 3)) + + @bigmemtest(minsize=_2G, memuse=2) + def test_translate(self, size): + trans = string.maketrans('.aZ', '-!$') + SUBSTR = 'aZz.z.Aaz.' + sublen = len(SUBSTR) + repeats = size // sublen + 2 + s = SUBSTR * repeats + s = s.translate(trans) + self.assertEquals(len(s), repeats * sublen) + self.assertEquals(s[:sublen], SUBSTR.translate(trans)) + self.assertEquals(s[-sublen:], SUBSTR.translate(trans)) + self.assertEquals(s.count('.'), 0) + self.assertEquals(s.count('!'), repeats * 2) + self.assertEquals(s.count('z'), repeats * 3) + + @bigmemtest(minsize=_2G + 5, memuse=2) + def test_upper(self, size): + s = 'a' * size + s = s.upper() + self.assertEquals(len(s), size) + self.assertEquals(s.count('A'), size) + + @bigmemtest(minsize=_2G + 20, memuse=1) + def test_zfill(self, size): + SUBSTR = '-568324723598234' + s = SUBSTR.zfill(size) + self.failUnless(s.endswith('0' + SUBSTR[1:])) + self.failUnless(s.startswith('-0')) + self.assertEquals(len(s), size) + self.assertEquals(s.count('0'), size - len(SUBSTR)) + + @bigmemtest(minsize=_2G + 10, memuse=2) + def test_format(self, size): + s = '-' * size + sf = '%s' % (s,) + self.failUnless(s == sf) + del sf + sf = '..%s..' % (s,) + self.assertEquals(len(sf), len(s) + 4) + self.failUnless(sf.startswith('..-')) + self.failUnless(sf.endswith('-..')) + del s, sf + + size //= 2 + edge = '-' * size + s = ''.join([edge, '%s', edge]) + del edge + s = s % '...' + self.assertEquals(len(s), size * 2 + 3) + self.assertEquals(s.count('.'), 3) + self.assertEquals(s.count('-'), size * 2) + + @bigmemtest(minsize=_2G + 10, memuse=2) + def test_repr_small(self, size): + s = '-' * size + s = repr(s) + self.assertEquals(len(s), size + 2) + self.assertEquals(s[0], "'") + self.assertEquals(s[-1], "'") + self.assertEquals(s.count('-'), size) + del s + # repr() will create a string four times as large as this 'binary + # string', but we don't want to allocate much more than twice + # size in total. (We do extra testing in test_repr_large()) + size = size // 5 * 2 + s = '\x00' * size + s = repr(s) + self.assertEquals(len(s), size * 4 + 2) + self.assertEquals(s[0], "'") + self.assertEquals(s[-1], "'") + self.assertEquals(s.count('\\'), size) + self.assertEquals(s.count('0'), size * 2) + + @bigmemtest(minsize=_2G + 10, memuse=5) + def test_repr_large(self, size): + s = '\x00' * size + s = repr(s) + self.assertEquals(len(s), size * 4 + 2) + self.assertEquals(s[0], "'") + self.assertEquals(s[-1], "'") + self.assertEquals(s.count('\\'), size) + self.assertEquals(s.count('0'), size * 2) + + # This test is meaningful even with size < 2G, as long as the + # doubled string is > 2G (but it tests more if both are > 2G :) + @bigmemtest(minsize=_1G + 2, memuse=3) + def test_concat(self, size): + s = '.' * size + self.assertEquals(len(s), size) + s = s + s + self.assertEquals(len(s), size * 2) + self.assertEquals(s.count('.'), size * 2) + + # This test is meaningful even with size < 2G, as long as the + # repeated string is > 2G (but it tests more if both are > 2G :) + @bigmemtest(minsize=_1G + 2, memuse=3) + def test_repeat(self, size): + s = '.' * size + self.assertEquals(len(s), size) + s = s * 2 + self.assertEquals(len(s), size * 2) + self.assertEquals(s.count('.'), size * 2) + + @bigmemtest(minsize=_2G + 20, memuse=1) + def test_slice_and_getitem(self, size): + SUBSTR = '0123456789' + sublen = len(SUBSTR) + s = SUBSTR * (size // sublen) + stepsize = len(s) // 100 + stepsize = stepsize - (stepsize % sublen) + for i in range(0, len(s) - stepsize, stepsize): + self.assertEquals(s[i], SUBSTR[0]) + self.assertEquals(s[i:i + sublen], SUBSTR) + self.assertEquals(s[i:i + sublen:2], SUBSTR[::2]) + if i > 0: + self.assertEquals(s[i + sublen - 1:i - 1:-3], + SUBSTR[sublen::-3]) + # Make sure we do some slicing and indexing near the end of the + # string, too. + self.assertEquals(s[len(s) - 1], SUBSTR[-1]) + self.assertEquals(s[-1], SUBSTR[-1]) + self.assertEquals(s[len(s) - 10], SUBSTR[0]) + self.assertEquals(s[-sublen], SUBSTR[0]) + self.assertEquals(s[len(s):], '') + self.assertEquals(s[len(s) - 1:], SUBSTR[-1]) + self.assertEquals(s[-1:], SUBSTR[-1]) + self.assertEquals(s[len(s) - sublen:], SUBSTR) + self.assertEquals(s[-sublen:], SUBSTR) + self.assertEquals(len(s[:]), len(s)) + self.assertEquals(len(s[:len(s) - 5]), len(s) - 5) + self.assertEquals(len(s[5:-5]), len(s) - 10) + + self.assertRaises(IndexError, operator.getitem, s, len(s)) + self.assertRaises(IndexError, operator.getitem, s, len(s) + 1) + self.assertRaises(IndexError, operator.getitem, s, len(s) + 1<<31) + + @bigmemtest(minsize=_2G, memuse=2) + def test_contains(self, size): + SUBSTR = '0123456789' + edge = '-' * (size // 2) + s = ''.join([edge, SUBSTR, edge]) + del edge + self.failUnless(SUBSTR in s) + self.failIf(SUBSTR * 2 in s) + self.failUnless('-' in s) + self.failIf('a' in s) + s += 'a' + self.failUnless('a' in s) + + @bigmemtest(minsize=_2G + 10, memuse=2) + def test_compare(self, size): + s1 = '-' * size + s2 = '-' * size + self.failUnless(s1 == s2) + del s2 + s2 = s1 + 'a' + self.failIf(s1 == s2) + del s2 + s2 = '.' * size + self.failIf(s1 == s2) + + @bigmemtest(minsize=_2G + 10, memuse=1) + def test_hash(self, size): + # Not sure if we can do any meaningful tests here... Even if we + # start relying on the exact algorithm used, the result will be + # different depending on the size of the C 'long int'. Even this + # test is dodgy (there's no *guarantee* that the two things should + # have a different hash, even if they, in the current + # implementation, almost always do.) + s = '\x00' * size + h1 = hash(s) + del s + s = '\x00' * (size + 1) + self.failIf(h1 == hash(s)) + +class TupleTest(unittest.TestCase): + + # Tuples have a small, fixed-sized head and an array of pointers to + # data. Since we're testing 64-bit addressing, we can assume that the + # pointers are 8 bytes, and that thus that the tuples take up 8 bytes + # per size. + + # As a side-effect of testing long tuples, these tests happen to test + # having more than 2<<31 references to any given object. Hence the + # use of different types of objects as contents in different tests. + + @bigmemtest(minsize=_2G + 2, memuse=16) + def test_compare(self, size): + t1 = (u'',) * size + t2 = (u'',) * size + self.failUnless(t1 == t2) + del t2 + t2 = (u'',) * (size + 1) + self.failIf(t1 == t2) + del t2 + t2 = (1,) * size + self.failIf(t1 == t2) + + # Test concatenating into a single tuple of more than 2G in length, + # and concatenating a tuple of more than 2G in length separately, so + # the smaller test still gets run even if there isn't memory for the + # larger test (but we still let the tester know the larger test is + # skipped, in verbose mode.) + def basic_concat_test(self, size): + t = ((),) * size + self.assertEquals(len(t), size) + t = t + t + self.assertEquals(len(t), size * 2) + + @bigmemtest(minsize=_2G // 2 + 2, memuse=24) + def test_concat_small(self, size): + return self.basic_concat_test(size) + + @bigmemtest(minsize=_2G + 2, memuse=24) + def test_concat_large(self, size): + return self.basic_concat_test(size) + + @bigmemtest(minsize=_2G // 5 + 10, memuse=8 * 5) + def test_contains(self, size): + t = (1, 2, 3, 4, 5) * size + self.assertEquals(len(t), size * 5) + self.failUnless(5 in t) + self.failIf((1, 2, 3, 4, 5) in t) + self.failIf(0 in t) + + @bigmemtest(minsize=_2G + 10, memuse=8) + def test_hash(self, size): + t1 = (0,) * size + h1 = hash(t1) + del t1 + t2 = (0,) * (size + 1) + self.failIf(h1 == hash(t2)) + + @bigmemtest(minsize=_2G + 10, memuse=8) + def test_index_and_slice(self, size): + t = (None,) * size + self.assertEquals(len(t), size) + self.assertEquals(t[-1], None) + self.assertEquals(t[5], None) + self.assertEquals(t[size - 1], None) + self.assertRaises(IndexError, operator.getitem, t, size) + self.assertEquals(t[:5], (None,) * 5) + self.assertEquals(t[-5:], (None,) * 5) + self.assertEquals(t[20:25], (None,) * 5) + self.assertEquals(t[-25:-20], (None,) * 5) + self.assertEquals(t[size - 5:], (None,) * 5) + self.assertEquals(t[size - 5:size], (None,) * 5) + self.assertEquals(t[size - 6:size - 2], (None,) * 4) + self.assertEquals(t[size:size], ()) + self.assertEquals(t[size:size+5], ()) + + # Like test_concat, split in two. + def basic_test_repeat(self, size): + t = ('',) * size + self.assertEquals(len(t), size) + t = t * 2 + self.assertEquals(len(t), size * 2) + + @bigmemtest(minsize=_2G // 2 + 2, memuse=24) + def test_repeat_small(self, size): + return self.basic_test_repeat(size) + + @bigmemtest(minsize=_2G + 2, memuse=24) + def test_repeat_large(self, size): + return self.basic_test_repeat(size) + + # Like test_concat, split in two. + def basic_test_repr(self, size): + t = (0,) * size + s = repr(t) + # The repr of a tuple of 0's is exactly three times the tuple length. + self.assertEquals(len(s), size * 3) + self.assertEquals(s[:5], '(0, 0') + self.assertEquals(s[-5:], '0, 0)') + self.assertEquals(s.count('0'), size) + + @bigmemtest(minsize=_2G // 3 + 2, memuse=8 + 3) + def test_repr_small(self, size): + return self.basic_test_repr(size) + + @bigmemtest(minsize=_2G + 2, memuse=8 + 3) + def test_repr_large(self, size): + return self.basic_test_repr(size) + +class ListTest(unittest.TestCase): + + # Like tuples, lists have a small, fixed-sized head and an array of + # pointers to data, so 8 bytes per size. Also like tuples, we make the + # lists hold references to various objects to test their refcount + # limits. + + @bigmemtest(minsize=_2G + 2, memuse=16) + def test_compare(self, size): + l1 = [u''] * size + l2 = [u''] * size + self.failUnless(l1 == l2) + del l2 + l2 = [u''] * (size + 1) + self.failIf(l1 == l2) + del l2 + l2 = [2] * size + self.failIf(l1 == l2) + + # Test concatenating into a single list of more than 2G in length, + # and concatenating a list of more than 2G in length separately, so + # the smaller test still gets run even if there isn't memory for the + # larger test (but we still let the tester know the larger test is + # skipped, in verbose mode.) + def basic_test_concat(self, size): + l = [[]] * size + self.assertEquals(len(l), size) + l = l + l + self.assertEquals(len(l), size * 2) + + @bigmemtest(minsize=_2G // 2 + 2, memuse=24) + def test_concat_small(self, size): + return self.basic_test_concat(size) + + @bigmemtest(minsize=_2G + 2, memuse=24) + def test_concat_large(self, size): + return self.basic_test_concat(size) + + def basic_test_inplace_concat(self, size): + l = [sys.stdout] * size + l += l + self.assertEquals(len(l), size * 2) + self.failUnless(l[0] is l[-1]) + self.failUnless(l[size - 1] is l[size + 1]) + + @bigmemtest(minsize=_2G // 2 + 2, memuse=24) + def test_inplace_concat_small(self, size): + return self.basic_test_inplace_concat(size) + + @bigmemtest(minsize=_2G + 2, memuse=24) + def test_inplace_concat_large(self, size): + return self.basic_test_inplace_concat(size) + + @bigmemtest(minsize=_2G // 5 + 10, memuse=8 * 5) + def test_contains(self, size): + l = [1, 2, 3, 4, 5] * size + self.assertEquals(len(l), size * 5) + self.failUnless(5 in l) + self.failIf([1, 2, 3, 4, 5] in l) + self.failIf(0 in l) + + @bigmemtest(minsize=_2G + 10, memuse=8) + def test_hash(self, size): + l = [0] * size + self.failUnlessRaises(TypeError, hash, l) + + @bigmemtest(minsize=_2G + 10, memuse=8) + def test_index_and_slice(self, size): + l = [None] * size + self.assertEquals(len(l), size) + self.assertEquals(l[-1], None) + self.assertEquals(l[5], None) + self.assertEquals(l[size - 1], None) + self.assertRaises(IndexError, operator.getitem, l, size) + self.assertEquals(l[:5], [None] * 5) + self.assertEquals(l[-5:], [None] * 5) + self.assertEquals(l[20:25], [None] * 5) + self.assertEquals(l[-25:-20], [None] * 5) + self.assertEquals(l[size - 5:], [None] * 5) + self.assertEquals(l[size - 5:size], [None] * 5) + self.assertEquals(l[size - 6:size - 2], [None] * 4) + self.assertEquals(l[size:size], []) + self.assertEquals(l[size:size+5], []) + + l[size - 2] = 5 + self.assertEquals(len(l), size) + self.assertEquals(l[-3:], [None, 5, None]) + self.assertEquals(l.count(5), 1) + self.assertRaises(IndexError, operator.setitem, l, size, 6) + self.assertEquals(len(l), size) + + l[size - 7:] = [1, 2, 3, 4, 5] + size -= 2 + self.assertEquals(len(l), size) + self.assertEquals(l[-7:], [None, None, 1, 2, 3, 4, 5]) + + l[:7] = [1, 2, 3, 4, 5] + size -= 2 + self.assertEquals(len(l), size) + self.assertEquals(l[:7], [1, 2, 3, 4, 5, None, None]) + + del l[size - 1] + size -= 1 + self.assertEquals(len(l), size) + self.assertEquals(l[-1], 4) + + del l[-2:] + size -= 2 + self.assertEquals(len(l), size) + self.assertEquals(l[-1], 2) + + del l[0] + size -= 1 + self.assertEquals(len(l), size) + self.assertEquals(l[0], 2) + + del l[:2] + size -= 2 + self.assertEquals(len(l), size) + self.assertEquals(l[0], 4) + + # Like test_concat, split in two. + def basic_test_repeat(self, size): + l = [] * size + self.failIf(l) + l = [''] * size + self.assertEquals(len(l), size) + l = l * 2 + self.assertEquals(len(l), size * 2) + + @bigmemtest(minsize=_2G // 2 + 2, memuse=24) + def test_repeat_small(self, size): + return self.basic_test_repeat(size) + + @bigmemtest(minsize=_2G + 2, memuse=24) + def test_repeat_large(self, size): + return self.basic_test_repeat(size) + + def basic_test_inplace_repeat(self, size): + l = [''] + l *= size + self.assertEquals(len(l), size) + self.failUnless(l[0] is l[-1]) + del l + + l = [''] * size + l *= 2 + self.assertEquals(len(l), size * 2) + self.failUnless(l[size - 1] is l[-1]) + + @bigmemtest(minsize=_2G // 2 + 2, memuse=16) + def test_inplace_repeat_small(self, size): + return self.basic_test_inplace_repeat(size) + + @bigmemtest(minsize=_2G + 2, memuse=16) + def test_inplace_repeat_large(self, size): + return self.basic_test_inplace_repeat(size) + + def basic_test_repr(self, size): + l = [0] * size + s = repr(l) + # The repr of a list of 0's is exactly three times the list length. + self.assertEquals(len(s), size * 3) + self.assertEquals(s[:5], '[0, 0') + self.assertEquals(s[-5:], '0, 0]') + self.assertEquals(s.count('0'), size) + + @bigmemtest(minsize=_2G // 3 + 2, memuse=8 + 3) + def test_repr_small(self, size): + return self.basic_test_repr(size) + + @bigmemtest(minsize=_2G + 2, memuse=8 + 3) + def test_repr_large(self, size): + return self.basic_test_repr(size) + + # list overallocates ~1/8th of the total size (on first expansion) so + # the single list.append call puts memuse at 9 bytes per size. + @bigmemtest(minsize=_2G, memuse=9) + def test_append(self, size): + l = [object()] * size + l.append(object()) + self.assertEquals(len(l), size+1) + self.failUnless(l[-3] is l[-2]) + self.failIf(l[-2] is l[-1]) + + @bigmemtest(minsize=_2G // 5 + 2, memuse=8 * 5) + def test_count(self, size): + l = [1, 2, 3, 4, 5] * size + self.assertEquals(l.count(1), size) + self.assertEquals(l.count("1"), 0) + + def basic_test_extend(self, size): + l = [file] * size + l.extend(l) + self.assertEquals(len(l), size * 2) + self.failUnless(l[0] is l[-1]) + self.failUnless(l[size - 1] is l[size + 1]) + + @bigmemtest(minsize=_2G // 2 + 2, memuse=16) + def test_extend_small(self, size): + return self.basic_test_extend(size) + + @bigmemtest(minsize=_2G + 2, memuse=16) + def test_extend_large(self, size): + return self.basic_test_extend(size) + + @bigmemtest(minsize=_2G // 5 + 2, memuse=8 * 5) + def test_index(self, size): + l = [1L, 2L, 3L, 4L, 5L] * size + size *= 5 + self.assertEquals(l.index(1), 0) + self.assertEquals(l.index(5, size - 5), size - 1) + self.assertEquals(l.index(5, size - 5, size), size - 1) + self.assertRaises(ValueError, l.index, 1, size - 4, size) + self.assertRaises(ValueError, l.index, 6L) + + # This tests suffers from overallocation, just like test_append. + @bigmemtest(minsize=_2G + 10, memuse=9) + def test_insert(self, size): + l = [1.0] * size + l.insert(size - 1, "A") + size += 1 + self.assertEquals(len(l), size) + self.assertEquals(l[-3:], [1.0, "A", 1.0]) + + l.insert(size + 1, "B") + size += 1 + self.assertEquals(len(l), size) + self.assertEquals(l[-3:], ["A", 1.0, "B"]) + + l.insert(1, "C") + size += 1 + self.assertEquals(len(l), size) + self.assertEquals(l[:3], [1.0, "C", 1.0]) + self.assertEquals(l[size - 3:], ["A", 1.0, "B"]) + + @bigmemtest(minsize=_2G // 5 + 4, memuse=8 * 5) + def test_pop(self, size): + l = [u"a", u"b", u"c", u"d", u"e"] * size + size *= 5 + self.assertEquals(len(l), size) + + item = l.pop() + size -= 1 + self.assertEquals(len(l), size) + self.assertEquals(item, u"e") + self.assertEquals(l[-2:], [u"c", u"d"]) + + item = l.pop(0) + size -= 1 + self.assertEquals(len(l), size) + self.assertEquals(item, u"a") + self.assertEquals(l[:2], [u"b", u"c"]) + + item = l.pop(size - 2) + size -= 1 + self.assertEquals(len(l), size) + self.assertEquals(item, u"c") + self.assertEquals(l[-2:], [u"b", u"d"]) + + @bigmemtest(minsize=_2G + 10, memuse=8) + def test_remove(self, size): + l = [10] * size + self.assertEquals(len(l), size) + + l.remove(10) + size -= 1 + self.assertEquals(len(l), size) + + # Because of the earlier l.remove(), this append doesn't trigger + # a resize. + l.append(5) + size += 1 + self.assertEquals(len(l), size) + self.assertEquals(l[-2:], [10, 5]) + l.remove(5) + size -= 1 + self.assertEquals(len(l), size) + self.assertEquals(l[-2:], [10, 10]) + + @bigmemtest(minsize=_2G // 5 + 2, memuse=8 * 5) + def test_reverse(self, size): + l = [1, 2, 3, 4, 5] * size + l.reverse() + self.assertEquals(len(l), size * 5) + self.assertEquals(l[-5:], [5, 4, 3, 2, 1]) + self.assertEquals(l[:5], [5, 4, 3, 2, 1]) + + @bigmemtest(minsize=_2G // 5 + 2, memuse=8 * 5) + def test_sort(self, size): + l = [1, 2, 3, 4, 5] * size + l.sort() + self.assertEquals(len(l), size * 5) + self.assertEquals(l.count(1), size) + self.assertEquals(l[:10], [1] * 10) + self.assertEquals(l[-10:], [5] * 10) + +def test_main(): + test_support.run_unittest(StrTest, TupleTest, ListTest) + +if __name__ == '__main__': + if len(sys.argv) > 1: + test_support.set_memlimit(sys.argv[1]) + test_main() diff --git a/Lib/test/test_builtin.py b/Lib/test/test_builtin.py index ef4f407..71e2b0a 100644 --- a/Lib/test/test_builtin.py +++ b/Lib/test/test_builtin.py @@ -1,7 +1,8 @@ # Python test set -- built-in functions import test.test_support, unittest -from test.test_support import fcmp, have_unicode, TESTFN, unlink, run_unittest +from test.test_support import fcmp, have_unicode, TESTFN, unlink, \ + run_unittest, run_with_locale from operator import neg import sys, warnings, cStringIO, random, UserDict @@ -528,33 +529,20 @@ class BuiltinTest(unittest.TestCase): # Implementation limitation in PyFloat_FromString() self.assertRaises(ValueError, float, unicode("1"*10000)) + @run_with_locale('LC_NUMERIC', 'fr_FR', 'de_DE') def test_float_with_comma(self): # set locale to something that doesn't use '.' for the decimal point - try: - import locale - orig_locale = locale.setlocale(locale.LC_NUMERIC) - locale.setlocale(locale.LC_NUMERIC, 'fr_FR') - except: - # if we can't set the locale, just ignore this test - return - - try: - self.assertEqual(locale.localeconv()['decimal_point'], ',') - except: - # this test is worthless, just skip it and reset the locale - locale.setlocale(locale.LC_NUMERIC, orig_locale) + import locale + if not locale.localeconv()['decimal_point'] == ',': return - try: - self.assertEqual(float(" 3,14 "), 3.14) - self.assertEqual(float(" +3,14 "), 3.14) - self.assertEqual(float(" -3,14 "), -3.14) - self.assertRaises(ValueError, float, " 0x3.1 ") - self.assertRaises(ValueError, float, " -0x3.p-1 ") - self.assertEqual(float(" 25.e-1 "), 2.5) - self.assertEqual(fcmp(float(" .25e-1 "), .025), 0) - finally: - locale.setlocale(locale.LC_NUMERIC, orig_locale) + self.assertEqual(float(" 3,14 "), 3.14) + self.assertEqual(float(" +3,14 "), 3.14) + self.assertEqual(float(" -3,14 "), -3.14) + self.assertRaises(ValueError, float, " 0x3.1 ") + self.assertRaises(ValueError, float, " -0x3.p-1 ") + self.assertEqual(float(" 25.e-1 "), 2.5) + self.assertEqual(fcmp(float(" .25e-1 "), .025), 0) def test_floatconversion(self): # Make sure that calls to __float__() work properly @@ -693,6 +681,84 @@ class BuiltinTest(unittest.TestCase): self.assertEqual(int('0123', 0), 83) self.assertEqual(int('0x123', 16), 291) + # SF bug 1334662: int(string, base) wrong answers + # Various representations of 2**32 evaluated to 0 + # rather than 2**32 in previous versions + + self.assertEqual(int('100000000000000000000000000000000', 2), 4294967296L) + self.assertEqual(int('102002022201221111211', 3), 4294967296L) + self.assertEqual(int('10000000000000000', 4), 4294967296L) + self.assertEqual(int('32244002423141', 5), 4294967296L) + self.assertEqual(int('1550104015504', 6), 4294967296L) + self.assertEqual(int('211301422354', 7), 4294967296L) + self.assertEqual(int('40000000000', 8), 4294967296L) + self.assertEqual(int('12068657454', 9), 4294967296L) + self.assertEqual(int('4294967296', 10), 4294967296L) + self.assertEqual(int('1904440554', 11), 4294967296L) + self.assertEqual(int('9ba461594', 12), 4294967296L) + self.assertEqual(int('535a79889', 13), 4294967296L) + self.assertEqual(int('2ca5b7464', 14), 4294967296L) + self.assertEqual(int('1a20dcd81', 15), 4294967296L) + self.assertEqual(int('100000000', 16), 4294967296L) + self.assertEqual(int('a7ffda91', 17), 4294967296L) + self.assertEqual(int('704he7g4', 18), 4294967296L) + self.assertEqual(int('4f5aff66', 19), 4294967296L) + self.assertEqual(int('3723ai4g', 20), 4294967296L) + self.assertEqual(int('281d55i4', 21), 4294967296L) + self.assertEqual(int('1fj8b184', 22), 4294967296L) + self.assertEqual(int('1606k7ic', 23), 4294967296L) + self.assertEqual(int('mb994ag', 24), 4294967296L) + self.assertEqual(int('hek2mgl', 25), 4294967296L) + self.assertEqual(int('dnchbnm', 26), 4294967296L) + self.assertEqual(int('b28jpdm', 27), 4294967296L) + self.assertEqual(int('8pfgih4', 28), 4294967296L) + self.assertEqual(int('76beigg', 29), 4294967296L) + self.assertEqual(int('5qmcpqg', 30), 4294967296L) + self.assertEqual(int('4q0jto4', 31), 4294967296L) + self.assertEqual(int('4000000', 32), 4294967296L) + self.assertEqual(int('3aokq94', 33), 4294967296L) + self.assertEqual(int('2qhxjli', 34), 4294967296L) + self.assertEqual(int('2br45qb', 35), 4294967296L) + self.assertEqual(int('1z141z4', 36), 4294967296L) + + # SF bug 1334662: int(string, base) wrong answers + # Checks for proper evaluation of 2**32 + 1 + self.assertEqual(int('100000000000000000000000000000001', 2), 4294967297L) + self.assertEqual(int('102002022201221111212', 3), 4294967297L) + self.assertEqual(int('10000000000000001', 4), 4294967297L) + self.assertEqual(int('32244002423142', 5), 4294967297L) + self.assertEqual(int('1550104015505', 6), 4294967297L) + self.assertEqual(int('211301422355', 7), 4294967297L) + self.assertEqual(int('40000000001', 8), 4294967297L) + self.assertEqual(int('12068657455', 9), 4294967297L) + self.assertEqual(int('4294967297', 10), 4294967297L) + self.assertEqual(int('1904440555', 11), 4294967297L) + self.assertEqual(int('9ba461595', 12), 4294967297L) + self.assertEqual(int('535a7988a', 13), 4294967297L) + self.assertEqual(int('2ca5b7465', 14), 4294967297L) + self.assertEqual(int('1a20dcd82', 15), 4294967297L) + self.assertEqual(int('100000001', 16), 4294967297L) + self.assertEqual(int('a7ffda92', 17), 4294967297L) + self.assertEqual(int('704he7g5', 18), 4294967297L) + self.assertEqual(int('4f5aff67', 19), 4294967297L) + self.assertEqual(int('3723ai4h', 20), 4294967297L) + self.assertEqual(int('281d55i5', 21), 4294967297L) + self.assertEqual(int('1fj8b185', 22), 4294967297L) + self.assertEqual(int('1606k7id', 23), 4294967297L) + self.assertEqual(int('mb994ah', 24), 4294967297L) + self.assertEqual(int('hek2mgm', 25), 4294967297L) + self.assertEqual(int('dnchbnn', 26), 4294967297L) + self.assertEqual(int('b28jpdn', 27), 4294967297L) + self.assertEqual(int('8pfgih5', 28), 4294967297L) + self.assertEqual(int('76beigh', 29), 4294967297L) + self.assertEqual(int('5qmcpqh', 30), 4294967297L) + self.assertEqual(int('4q0jto5', 31), 4294967297L) + self.assertEqual(int('4000001', 32), 4294967297L) + self.assertEqual(int('3aokq95', 33), 4294967297L) + self.assertEqual(int('2qhxjlj', 34), 4294967297L) + self.assertEqual(int('2br45qc', 35), 4294967297L) + self.assertEqual(int('1z141z5', 36), 4294967297L) + def test_intconversion(self): # Test __int__() class Foo0: @@ -886,6 +952,81 @@ class BuiltinTest(unittest.TestCase): self.assertRaises(ValueError, long, '53', 40) self.assertRaises(TypeError, long, 1, 12) + self.assertEqual(long('100000000000000000000000000000000', 2), + 4294967296) + self.assertEqual(long('102002022201221111211', 3), 4294967296) + self.assertEqual(long('10000000000000000', 4), 4294967296) + self.assertEqual(long('32244002423141', 5), 4294967296) + self.assertEqual(long('1550104015504', 6), 4294967296) + self.assertEqual(long('211301422354', 7), 4294967296) + self.assertEqual(long('40000000000', 8), 4294967296) + self.assertEqual(long('12068657454', 9), 4294967296) + self.assertEqual(long('4294967296', 10), 4294967296) + self.assertEqual(long('1904440554', 11), 4294967296) + self.assertEqual(long('9ba461594', 12), 4294967296) + self.assertEqual(long('535a79889', 13), 4294967296) + self.assertEqual(long('2ca5b7464', 14), 4294967296) + self.assertEqual(long('1a20dcd81', 15), 4294967296) + self.assertEqual(long('100000000', 16), 4294967296) + self.assertEqual(long('a7ffda91', 17), 4294967296) + self.assertEqual(long('704he7g4', 18), 4294967296) + self.assertEqual(long('4f5aff66', 19), 4294967296) + self.assertEqual(long('3723ai4g', 20), 4294967296) + self.assertEqual(long('281d55i4', 21), 4294967296) + self.assertEqual(long('1fj8b184', 22), 4294967296) + self.assertEqual(long('1606k7ic', 23), 4294967296) + self.assertEqual(long('mb994ag', 24), 4294967296) + self.assertEqual(long('hek2mgl', 25), 4294967296) + self.assertEqual(long('dnchbnm', 26), 4294967296) + self.assertEqual(long('b28jpdm', 27), 4294967296) + self.assertEqual(long('8pfgih4', 28), 4294967296) + self.assertEqual(long('76beigg', 29), 4294967296) + self.assertEqual(long('5qmcpqg', 30), 4294967296) + self.assertEqual(long('4q0jto4', 31), 4294967296) + self.assertEqual(long('4000000', 32), 4294967296) + self.assertEqual(long('3aokq94', 33), 4294967296) + self.assertEqual(long('2qhxjli', 34), 4294967296) + self.assertEqual(long('2br45qb', 35), 4294967296) + self.assertEqual(long('1z141z4', 36), 4294967296) + + self.assertEqual(long('100000000000000000000000000000001', 2), + 4294967297) + self.assertEqual(long('102002022201221111212', 3), 4294967297) + self.assertEqual(long('10000000000000001', 4), 4294967297) + self.assertEqual(long('32244002423142', 5), 4294967297) + self.assertEqual(long('1550104015505', 6), 4294967297) + self.assertEqual(long('211301422355', 7), 4294967297) + self.assertEqual(long('40000000001', 8), 4294967297) + self.assertEqual(long('12068657455', 9), 4294967297) + self.assertEqual(long('4294967297', 10), 4294967297) + self.assertEqual(long('1904440555', 11), 4294967297) + self.assertEqual(long('9ba461595', 12), 4294967297) + self.assertEqual(long('535a7988a', 13), 4294967297) + self.assertEqual(long('2ca5b7465', 14), 4294967297) + self.assertEqual(long('1a20dcd82', 15), 4294967297) + self.assertEqual(long('100000001', 16), 4294967297) + self.assertEqual(long('a7ffda92', 17), 4294967297) + self.assertEqual(long('704he7g5', 18), 4294967297) + self.assertEqual(long('4f5aff67', 19), 4294967297) + self.assertEqual(long('3723ai4h', 20), 4294967297) + self.assertEqual(long('281d55i5', 21), 4294967297) + self.assertEqual(long('1fj8b185', 22), 4294967297) + self.assertEqual(long('1606k7id', 23), 4294967297) + self.assertEqual(long('mb994ah', 24), 4294967297) + self.assertEqual(long('hek2mgm', 25), 4294967297) + self.assertEqual(long('dnchbnn', 26), 4294967297) + self.assertEqual(long('b28jpdn', 27), 4294967297) + self.assertEqual(long('8pfgih5', 28), 4294967297) + self.assertEqual(long('76beigh', 29), 4294967297) + self.assertEqual(long('5qmcpqh', 30), 4294967297) + self.assertEqual(long('4q0jto5', 31), 4294967297) + self.assertEqual(long('4000001', 32), 4294967297) + self.assertEqual(long('3aokq95', 33), 4294967297) + self.assertEqual(long('2qhxjlj', 34), 4294967297) + self.assertEqual(long('2br45qc', 35), 4294967297) + self.assertEqual(long('1z141z5', 36), 4294967297) + + def test_longconversion(self): # Test __long__() class Foo0: diff --git a/Lib/test/test_cmd_line.py b/Lib/test/test_cmd_line.py index 018bec6..ec860d1 100644 --- a/Lib/test/test_cmd_line.py +++ b/Lib/test/test_cmd_line.py @@ -15,8 +15,11 @@ class CmdLineTest(unittest.TestCase): popen2._cleanup() return data - def exit_code(self, cmd_line): - return subprocess.call([sys.executable, cmd_line], stderr=subprocess.PIPE) + def exit_code(self, *args): + cmd_line = [sys.executable] + cmd_line.extend(args) + return subprocess.call(cmd_line, stdout=subprocess.PIPE, + stderr=subprocess.PIPE) def test_directories(self): self.assertNotEqual(self.exit_code('.'), 0) @@ -50,6 +53,38 @@ class CmdLineTest(unittest.TestCase): version = 'Python %d.%d' % sys.version_info[:2] self.assertTrue(self.start_python('-V').startswith(version)) + def test_run_module(self): + # Test expected operation of the '-m' switch + # Switch needs an argument + self.assertNotEqual(self.exit_code('-m'), 0) + # Check we get an error for a nonexistent module + self.assertNotEqual( + self.exit_code('-m', 'fnord43520xyz'), + 0) + # Check the runpy module also gives an error for + # a nonexistent module + self.assertNotEqual( + self.exit_code('-m', 'runpy', 'fnord43520xyz'), + 0) + # All good if module is located and run successfully + self.assertEqual( + self.exit_code('-m', 'timeit', '-n', '1'), + 0) + + def test_run_code(self): + # Test expected operation of the '-c' switch + # Switch needs an argument + self.assertNotEqual(self.exit_code('-c'), 0) + # Check we get an error for an uncaught exception + self.assertNotEqual( + self.exit_code('-c', 'raise Exception'), + 0) + # All good if execution is successful + self.assertEqual( + self.exit_code('-c', 'pass'), + 0) + + def test_main(): test.test_support.run_unittest(CmdLineTest) diff --git a/Lib/test/test_codeccallbacks.py b/Lib/test/test_codeccallbacks.py index c6e56c9..159c86d 100644 --- a/Lib/test/test_codeccallbacks.py +++ b/Lib/test/test_codeccallbacks.py @@ -18,30 +18,12 @@ class PosReturn: self.pos = len(exc.object) return (u"<?>", oldpos) -# A UnicodeEncodeError object without a start attribute -class NoStartUnicodeEncodeError(UnicodeEncodeError): - def __init__(self): - UnicodeEncodeError.__init__(self, "ascii", u"", 0, 1, "bad") - del self.start - # A UnicodeEncodeError object with a bad start attribute class BadStartUnicodeEncodeError(UnicodeEncodeError): def __init__(self): UnicodeEncodeError.__init__(self, "ascii", u"", 0, 1, "bad") self.start = [] -# A UnicodeEncodeError object without an end attribute -class NoEndUnicodeEncodeError(UnicodeEncodeError): - def __init__(self): - UnicodeEncodeError.__init__(self, "ascii", u"", 0, 1, "bad") - del self.end - -# A UnicodeEncodeError object without an object attribute -class NoObjectUnicodeEncodeError(UnicodeEncodeError): - def __init__(self): - UnicodeEncodeError.__init__(self, "ascii", u"", 0, 1, "bad") - del self.object - # A UnicodeEncodeError object with a bad object attribute class BadObjectUnicodeEncodeError(UnicodeEncodeError): def __init__(self): @@ -478,55 +460,15 @@ class CodecCallbackTest(unittest.TestCase): UnicodeError("ouch") ) self.assertRaises( - AttributeError, - codecs.replace_errors, - NoStartUnicodeEncodeError() - ) - self.assertRaises( - TypeError, - codecs.replace_errors, - BadStartUnicodeEncodeError() - ) - self.assertRaises( - AttributeError, - codecs.replace_errors, - NoEndUnicodeEncodeError() - ) - self.assertRaises( - AttributeError, - codecs.replace_errors, - NoObjectUnicodeEncodeError() - ) - self.assertRaises( TypeError, codecs.replace_errors, BadObjectUnicodeEncodeError() ) self.assertRaises( - AttributeError, - codecs.replace_errors, - NoEndUnicodeDecodeError() - ) - self.assertRaises( TypeError, codecs.replace_errors, BadObjectUnicodeDecodeError() ) - self.assertRaises( - AttributeError, - codecs.replace_errors, - NoStartUnicodeTranslateError() - ) - self.assertRaises( - AttributeError, - codecs.replace_errors, - NoEndUnicodeTranslateError() - ) - self.assertRaises( - AttributeError, - codecs.replace_errors, - NoObjectUnicodeTranslateError() - ) # With the correct exception, "replace" returns an "?" or u"\ufffd" replacement self.assertEquals( codecs.replace_errors(UnicodeEncodeError("ascii", u"\u3042", 0, 1, "ouch")), @@ -565,21 +507,6 @@ class CodecCallbackTest(unittest.TestCase): codecs.xmlcharrefreplace_errors, UnicodeTranslateError(u"\u3042", 0, 1, "ouch") ) - self.assertRaises( - AttributeError, - codecs.xmlcharrefreplace_errors, - NoStartUnicodeEncodeError() - ) - self.assertRaises( - AttributeError, - codecs.xmlcharrefreplace_errors, - NoEndUnicodeEncodeError() - ) - self.assertRaises( - AttributeError, - codecs.xmlcharrefreplace_errors, - NoObjectUnicodeEncodeError() - ) # Use the correct exception cs = (0, 1, 9, 10, 99, 100, 999, 1000, 9999, 10000, 0x3042) s = "".join(unichr(c) for c in cs) diff --git a/Lib/test/test_codecencodings_cn.py b/Lib/test/test_codecencodings_cn.py index 0638f4f..1bf8583 100644 --- a/Lib/test/test_codecencodings_cn.py +++ b/Lib/test/test_codecencodings_cn.py @@ -3,7 +3,6 @@ # test_codecencodings_cn.py # Codec encoding tests for PRC encodings. # -# $CJKCodecs: test_codecencodings_cn.py,v 1.2 2004/06/19 06:09:55 perky Exp $ from test import test_support from test import test_multibytecodec_support diff --git a/Lib/test/test_codecencodings_hk.py b/Lib/test/test_codecencodings_hk.py index e7fad90..1cd020f 100644 --- a/Lib/test/test_codecencodings_hk.py +++ b/Lib/test/test_codecencodings_hk.py @@ -3,7 +3,6 @@ # test_codecencodings_hk.py # Codec encoding tests for HongKong encodings. # -# $CJKCodecs: test_codecencodings_hk.py,v 1.1 2004/07/10 17:35:20 perky Exp $ from test import test_support from test import test_multibytecodec_support diff --git a/Lib/test/test_codecencodings_jp.py b/Lib/test/test_codecencodings_jp.py index 483b7db..558598a 100644 --- a/Lib/test/test_codecencodings_jp.py +++ b/Lib/test/test_codecencodings_jp.py @@ -3,7 +3,6 @@ # test_codecencodings_jp.py # Codec encoding tests for Japanese encodings. # -# $CJKCodecs: test_codecencodings_jp.py,v 1.3 2004/06/19 06:09:55 perky Exp $ from test import test_support from test import test_multibytecodec_support diff --git a/Lib/test/test_codecencodings_kr.py b/Lib/test/test_codecencodings_kr.py index 489c9f1..8139f76 100644 --- a/Lib/test/test_codecencodings_kr.py +++ b/Lib/test/test_codecencodings_kr.py @@ -3,7 +3,6 @@ # test_codecencodings_kr.py # Codec encoding tests for ROK encodings. # -# $CJKCodecs: test_codecencodings_kr.py,v 1.2 2004/06/19 06:09:55 perky Exp $ from test import test_support from test import test_multibytecodec_support diff --git a/Lib/test/test_codecencodings_tw.py b/Lib/test/test_codecencodings_tw.py index fb8a4d0..7c59478 100644 --- a/Lib/test/test_codecencodings_tw.py +++ b/Lib/test/test_codecencodings_tw.py @@ -3,7 +3,6 @@ # test_codecencodings_tw.py # Codec encoding tests for ROC encodings. # -# $CJKCodecs: test_codecencodings_tw.py,v 1.2 2004/06/19 06:09:55 perky Exp $ from test import test_support from test import test_multibytecodec_support diff --git a/Lib/test/test_codecmaps_cn.py b/Lib/test/test_codecmaps_cn.py index 25ecc02..8cbee76 100644 --- a/Lib/test/test_codecmaps_cn.py +++ b/Lib/test/test_codecmaps_cn.py @@ -3,7 +3,6 @@ # test_codecmaps_cn.py # Codec mapping tests for PRC encodings # -# $CJKCodecs: test_codecmaps_cn.py,v 1.3 2004/06/19 06:09:55 perky Exp $ from test import test_support from test import test_multibytecodec_support diff --git a/Lib/test/test_codecmaps_hk.py b/Lib/test/test_codecmaps_hk.py index 2335c51..e7f7b96 100644 --- a/Lib/test/test_codecmaps_hk.py +++ b/Lib/test/test_codecmaps_hk.py @@ -3,7 +3,6 @@ # test_codecmaps_hk.py # Codec mapping tests for HongKong encodings # -# $CJKCodecs: test_codecmaps_hk.py,v 1.1 2004/07/10 17:35:20 perky Exp $ from test import test_support from test import test_multibytecodec_support diff --git a/Lib/test/test_codecmaps_jp.py b/Lib/test/test_codecmaps_jp.py index e75a5a8..08052d4 100644 --- a/Lib/test/test_codecmaps_jp.py +++ b/Lib/test/test_codecmaps_jp.py @@ -3,7 +3,6 @@ # test_codecmaps_jp.py # Codec mapping tests for Japanese encodings # -# $CJKCodecs: test_codecmaps_jp.py,v 1.3 2004/06/19 06:09:55 perky Exp $ from test import test_support from test import test_multibytecodec_support diff --git a/Lib/test/test_codecmaps_kr.py b/Lib/test/test_codecmaps_kr.py index db65c01..7484a66 100644 --- a/Lib/test/test_codecmaps_kr.py +++ b/Lib/test/test_codecmaps_kr.py @@ -3,7 +3,6 @@ # test_codecmaps_kr.py # Codec mapping tests for ROK encodings # -# $CJKCodecs: test_codecmaps_kr.py,v 1.3 2004/06/19 06:09:55 perky Exp $ from test import test_support from test import test_multibytecodec_support diff --git a/Lib/test/test_codecmaps_tw.py b/Lib/test/test_codecmaps_tw.py index 2d469b0..0b195f4 100644 --- a/Lib/test/test_codecmaps_tw.py +++ b/Lib/test/test_codecmaps_tw.py @@ -3,7 +3,6 @@ # test_codecmaps_tw.py # Codec mapping tests for ROC encodings # -# $CJKCodecs: test_codecmaps_tw.py,v 1.3 2004/06/19 06:09:55 perky Exp $ from test import test_support from test import test_multibytecodec_support diff --git a/Lib/test/test_compiler.py b/Lib/test/test_compiler.py index a59d6aa..48f1643 100644 --- a/Lib/test/test_compiler.py +++ b/Lib/test/test_compiler.py @@ -26,6 +26,7 @@ class CompilerTest(unittest.TestCase): next_time = time.time() + _PRINT_WORKING_MSG_INTERVAL print >>sys.__stdout__, \ ' testCompileLibrary still working, be patient...' + sys.__stdout__.flush() if not basename.endswith(".py"): continue @@ -55,6 +56,9 @@ class CompilerTest(unittest.TestCase): def testYieldExpr(self): compiler.compile("def g(): yield\n\n", "<string>", "exec") + def testDefaultArgs(self): + self.assertRaises(SyntaxError, compiler.parse, "def foo(a=1, b): pass") + def testLineNo(self): # Test that all nodes except Module have a correct lineno attribute. filename = __file__ diff --git a/Lib/test/test_contextlib.py b/Lib/test/test_contextlib.py index 97470c7..2cf39ae 100644 --- a/Lib/test/test_contextlib.py +++ b/Lib/test/test_contextlib.py @@ -51,7 +51,7 @@ class ContextManagerTestCase(unittest.TestCase): @contextmanager def whee(): yield - ctx = whee().__context__() + ctx = whee() ctx.__enter__() # Calling __exit__ should not result in an exception self.failIf(ctx.__exit__(TypeError, TypeError("foo"), None)) @@ -63,7 +63,7 @@ class ContextManagerTestCase(unittest.TestCase): yield except: yield - ctx = whoo().__context__() + ctx = whoo() ctx.__enter__() self.assertRaises( RuntimeError, ctx.__exit__, TypeError, TypeError("foo"), None @@ -146,6 +146,29 @@ class NestedTestCase(unittest.TestCase): else: self.fail("Didn't raise ZeroDivisionError") + def test_nested_right_exception(self): + state = [] + @contextmanager + def a(): + yield 1 + class b(object): + def __enter__(self): + return 2 + def __exit__(self, *exc_info): + try: + raise Exception() + except: + pass + try: + with nested(a(), b()) as (x, y): + 1/0 + except ZeroDivisionError: + self.assertEqual((x, y), (1, 2)) + except Exception: + self.fail("Reraised wrong exception") + else: + self.fail("Didn't raise ZeroDivisionError") + def test_nested_b_swallows(self): @contextmanager def a(): @@ -316,12 +339,12 @@ class DecimalContextTestCase(unittest.TestCase): orig_context = ctx.copy() try: ctx.prec = save_prec = decimal.ExtendedContext.prec + 5 - with decimal.ExtendedContext: + with decimal.ExtendedContext.get_manager(): self.assertEqual(decimal.getcontext().prec, decimal.ExtendedContext.prec) self.assertEqual(decimal.getcontext().prec, save_prec) try: - with decimal.ExtendedContext: + with decimal.ExtendedContext.get_manager(): self.assertEqual(decimal.getcontext().prec, decimal.ExtendedContext.prec) 1/0 diff --git a/Lib/test/test_cookielib.py b/Lib/test/test_cookielib.py index 49e7d47..991506c 100644 --- a/Lib/test/test_cookielib.py +++ b/Lib/test/test_cookielib.py @@ -695,6 +695,22 @@ class CookieTests(TestCase): 'foo=bar; domain=friendly.org; Version="1"') self.assertEquals(len(c), 0) + def test_strict_domain(self): + # Cookies whose domain is a country-code tld like .co.uk should + # not be set if CookiePolicy.strict_domain is true. + from cookielib import CookieJar, DefaultCookiePolicy + + cp = DefaultCookiePolicy(strict_domain=True) + cj = CookieJar(policy=cp) + interact_netscape(cj, "http://example.co.uk/", 'no=problemo') + interact_netscape(cj, "http://example.co.uk/", + 'okey=dokey; Domain=.example.co.uk') + self.assertEquals(len(cj), 2) + for pseudo_tld in [".co.uk", ".org.za", ".tx.us", ".name.us"]: + interact_netscape(cj, "http://example.%s/" % pseudo_tld, + 'spam=eggs; Domain=.co.uk') + self.assertEquals(len(cj), 2) + def test_two_component_domain_ns(self): # Netscape: .www.bar.com, www.bar.com, .bar.com, bar.com, no domain # should all get accepted, as should .acme.com, acme.com and no domain diff --git a/Lib/test/test_datetime.py b/Lib/test/test_datetime.py index 2528b4a..203bea1 100644 --- a/Lib/test/test_datetime.py +++ b/Lib/test/test_datetime.py @@ -1400,6 +1400,12 @@ class TestDateTime(TestDate): got = self.theclass.utcfromtimestamp(ts) self.verify_field_equality(expected, got) + def test_microsecond_rounding(self): + # Test whether fromtimestamp "rounds up" floats that are less + # than one microsecond smaller than an integer. + self.assertEquals(self.theclass.fromtimestamp(0.9999999), + self.theclass.fromtimestamp(1)) + def test_insane_fromtimestamp(self): # It's possible that some platform maps time_t to double, # and that this test will fail there. This test should diff --git a/Lib/test/test_doctest.py b/Lib/test/test_doctest.py index b17607d..443c962 100644 --- a/Lib/test/test_doctest.py +++ b/Lib/test/test_doctest.py @@ -1079,6 +1079,25 @@ output to match any substring in the actual output: ... # doctest: +NORMALIZE_WHITESPACE [0, 1, ..., 18, 19] +The SKIP flag causes an example to be skipped entirely. I.e., the +example is not run. It can be useful in contexts where doctest +examples serve as both documentation and test cases, and an example +should be included for documentation purposes, but should not be +checked (e.g., because its output is random, or depends on resources +which would be unavailable.) The SKIP flag can also be used for +'commenting out' broken examples. + + >>> import unavailable_resource # doctest: +SKIP + >>> unavailable_resource.do_something() # doctest: +SKIP + >>> unavailable_resource.blow_up() # doctest: +SKIP + Traceback (most recent call last): + ... + UncheckedBlowUpError: Nobody checks me. + + >>> import random + >>> print random.random() # doctest: +SKIP + 0.721216923889 + The REPORT_UDIFF flag causes failures that involve multi-line expected and actual outputs to be displayed using a unified diff: @@ -1281,6 +1300,26 @@ count as failures: ValueError: 2 (3, 5) +New option flags can also be registered, via register_optionflag(). Here +we reach into doctest's internals a bit. + + >>> unlikely = "UNLIKELY_OPTION_NAME" + >>> unlikely in doctest.OPTIONFLAGS_BY_NAME + False + >>> new_flag_value = doctest.register_optionflag(unlikely) + >>> unlikely in doctest.OPTIONFLAGS_BY_NAME + True + +Before 2.4.4/2.5, registering a name more than once erroneously created +more than one flag value. Here we verify that's fixed: + + >>> redundant_flag_value = doctest.register_optionflag(unlikely) + >>> redundant_flag_value == new_flag_value + True + +Clean up. + >>> del doctest.OPTIONFLAGS_BY_NAME[unlikely] + """ def option_directives(): r""" diff --git a/Lib/test/test_exceptions.py b/Lib/test/test_exceptions.py index 7946142..8f995f7 100644 --- a/Lib/test/test_exceptions.py +++ b/Lib/test/test_exceptions.py @@ -81,14 +81,6 @@ try: x = undefined_variable except NameError: pass r(OverflowError) -# XXX -# Obscure: in 2.2 and 2.3, this test relied on changing OverflowWarning -# into an error, in order to trigger OverflowError. In 2.4, OverflowWarning -# should no longer be generated, so the focus of the test shifts to showing -# that OverflowError *isn't* generated. OverflowWarning should be gone -# in Python 2.5, and then the filterwarnings() call, and this comment, -# should go away. -warnings.filterwarnings("error", "", OverflowWarning, __name__) x = 1 for dummy in range(128): x += x # this simply shouldn't blow up @@ -224,3 +216,88 @@ if not sys.platform.startswith('java'): test_capi3() unlink(TESTFN) + +# test that exception attributes are happy. +try: str(u'Hello \u00E1') +except Exception, e: sampleUnicodeEncodeError = e +try: unicode('\xff') +except Exception, e: sampleUnicodeDecodeError = e +exceptionList = [ + ( BaseException, (), { 'message' : '', 'args' : () }), + ( BaseException, (1, ), { 'message' : 1, 'args' : ( 1, ) }), + ( BaseException, ('foo', ), { 'message' : 'foo', 'args' : ( 'foo', ) }), + ( BaseException, ('foo', 1), { 'message' : '', 'args' : ( 'foo', 1 ) }), + ( SystemExit, ('foo',), { 'message' : 'foo', 'args' : ( 'foo', ), + 'code' : 'foo' }), + ( IOError, ('foo',), { 'message' : 'foo', 'args' : ( 'foo', ), }), + ( IOError, ('foo', 'bar'), { 'message' : '', + 'args' : ('foo', 'bar'), }), + ( IOError, ('foo', 'bar', 'baz'), + { 'message' : '', 'args' : ('foo', 'bar'), }), + ( EnvironmentError, ('errnoStr', 'strErrorStr', 'filenameStr'), + { 'message' : '', 'args' : ('errnoStr', 'strErrorStr'), + 'strerror' : 'strErrorStr', + 'errno' : 'errnoStr', 'filename' : 'filenameStr' }), + ( EnvironmentError, (1, 'strErrorStr', 'filenameStr'), + { 'message' : '', 'args' : (1, 'strErrorStr'), + 'strerror' : 'strErrorStr', 'errno' : 1, + 'filename' : 'filenameStr' }), + ( SyntaxError, ('msgStr',), + { 'message' : 'msgStr', 'args' : ('msgStr', ), + 'print_file_and_line' : None, 'msg' : 'msgStr', + 'filename' : None, 'lineno' : None, 'offset' : None, + 'text' : None }), + ( SyntaxError, ('msgStr', ('filenameStr', 'linenoStr', 'offsetStr', + 'textStr')), + { 'message' : '', 'args' : ('msgStr', ('filenameStr', + 'linenoStr', 'offsetStr', 'textStr' )), + 'print_file_and_line' : None, 'msg' : 'msgStr', + 'filename' : 'filenameStr', 'lineno' : 'linenoStr', + 'offset' : 'offsetStr', 'text' : 'textStr' }), + ( SyntaxError, ('msgStr', 'filenameStr', 'linenoStr', 'offsetStr', + 'textStr', 'print_file_and_lineStr'), + { 'message' : '', 'args' : ('msgStr', 'filenameStr', + 'linenoStr', 'offsetStr', 'textStr', + 'print_file_and_lineStr'), + 'print_file_and_line' : None, 'msg' : 'msgStr', + 'filename' : None, 'lineno' : None, 'offset' : None, + 'text' : None }), + ( UnicodeError, (), + { 'message' : '', 'args' : (), }), + ( sampleUnicodeEncodeError, + { 'message' : '', 'args' : ('ascii', u'Hello \xe1', 6, 7, + 'ordinal not in range(128)'), + 'encoding' : 'ascii', 'object' : u'Hello \xe1', + 'start' : 6, 'reason' : 'ordinal not in range(128)' }), + ( sampleUnicodeDecodeError, + { 'message' : '', 'args' : ('ascii', '\xff', 0, 1, + 'ordinal not in range(128)'), + 'encoding' : 'ascii', 'object' : '\xff', + 'start' : 0, 'reason' : 'ordinal not in range(128)' }), + ( UnicodeTranslateError, (u"\u3042", 0, 1, "ouch"), + { 'message' : '', 'args' : (u'\u3042', 0, 1, 'ouch'), + 'object' : u'\u3042', 'reason' : 'ouch', + 'start' : 0, 'end' : 1 }), + ] +try: + exceptionList.append( + ( WindowsError, (1, 'strErrorStr', 'filenameStr'), + { 'message' : '', 'args' : (1, 'strErrorStr'), + 'strerror' : 'strErrorStr', + 'errno' : 22, 'filename' : 'filenameStr', + 'winerror' : 1 })) +except NameError: pass + +for args in exceptionList: + expected = args[-1] + try: + if len(args) == 2: raise args[0] + else: raise apply(args[0], args[1]) + except BaseException, e: + for checkArgName in expected.keys(): + if repr(getattr(e, checkArgName)) != repr(expected[checkArgName]): + raise TestFailed('Checking exception arguments, exception ' + '"%s", attribute "%s" expected %s got %s.' % + ( repr(e), checkArgName, + repr(expected[checkArgName]), + repr(getattr(e, checkArgName)) )) diff --git a/Lib/test/test_file.py b/Lib/test/test_file.py index cfc1019..ca1c6ba 100644 --- a/Lib/test/test_file.py +++ b/Lib/test/test_file.py @@ -147,7 +147,7 @@ f.close() bad_mode = "qwerty" try: open(TESTFN, bad_mode) -except IOError, msg: +except ValueError, msg: if msg[0] != 0: s = str(msg) if s.find(TESTFN) != -1 or s.find(bad_mode) == -1: diff --git a/Lib/test/test_grp.py b/Lib/test/test_grp.py index 2c3ab29..08958ba 100755 --- a/Lib/test/test_grp.py +++ b/Lib/test/test_grp.py @@ -31,7 +31,10 @@ class GroupDatabaseTestCase(unittest.TestCase): self.assertEqual(e2.gr_gid, e.gr_gid) e2 = grp.getgrnam(e.gr_name) self.check_value(e2) - self.assertEqual(e2.gr_name, e.gr_name) + # There are instances where getgrall() returns group names in + # lowercase while getgrgid() returns proper casing. + # Discovered on Ubuntu 5.04 (custom). + self.assertEqual(e2.gr_name.lower(), e.gr_name.lower()) def test_errors(self): self.assertRaises(TypeError, grp.getgrgid) diff --git a/Lib/test/test_import.py b/Lib/test/test_import.py index a72b8bd..effba3c 100644 --- a/Lib/test/test_import.py +++ b/Lib/test/test_import.py @@ -205,3 +205,20 @@ def test_import_name_binding(): assert y is test.test_support, y.__name__ test_import_name_binding() + +def test_import_initless_directory_warning(): + import warnings + oldfilters = warnings.filters[:] + warnings.simplefilter('error', ImportWarning); + try: + # Just a random non-package directory we always expect to be + # somewhere in sys.path... + __import__("site-packages") + except ImportWarning: + pass + else: + raise AssertionError + finally: + warnings.filters = oldfilters + +test_import_initless_directory_warning() diff --git a/Lib/test/test_importhooks.py b/Lib/test/test_importhooks.py index 0693581..e8b4695 100644 --- a/Lib/test/test_importhooks.py +++ b/Lib/test/test_importhooks.py @@ -14,6 +14,7 @@ def get_file(): absimp = "import sub\n" relimp = "from . import sub\n" +deeprelimp = "from .... import sub\n" futimp = "from __future__ import absolute_import\n" reload_src = test_src+"""\ @@ -26,6 +27,7 @@ reload_co = compile(reload_src, "<???>", "exec") test2_oldabs_co = compile(absimp + test_src, "<???>", "exec") test2_newabs_co = compile(futimp + absimp + test_src, "<???>", "exec") test2_newrel_co = compile(relimp + test_src, "<???>", "exec") +test2_deeprel_co = compile(deeprelimp + test_src, "<???>", "exec") test2_futrel_co = compile(futimp + relimp + test_src, "<???>", "exec") test_path = "!!!_test_!!!" @@ -46,10 +48,11 @@ class TestImporter: "hooktestmodule": (False, test_co), "hooktestpackage": (True, test_co), "hooktestpackage.sub": (True, test_co), - "hooktestpackage.sub.subber": (False, test_co), + "hooktestpackage.sub.subber": (True, test_co), "hooktestpackage.oldabs": (False, test2_oldabs_co), "hooktestpackage.newabs": (False, test2_newabs_co), "hooktestpackage.newrel": (False, test2_newrel_co), + "hooktestpackage.sub.subber.subest": (True, test2_deeprel_co), "hooktestpackage.futrel": (False, test2_futrel_co), "sub": (False, test_co), "reloadmodule": (False, test_co), @@ -203,6 +206,12 @@ class ImportHooksTestCase(ImportHooksBaseTestCase): self.assertEqual(hooktestpackage.newrel.sub, hooktestpackage.sub) + import hooktestpackage.sub.subber.subest as subest + self.assertEqual(subest.get_name(), + "hooktestpackage.sub.subber.subest") + self.assertEqual(subest.sub, + hooktestpackage.sub) + import hooktestpackage.futrel self.assertEqual(hooktestpackage.futrel.get_name(), "hooktestpackage.futrel") diff --git a/Lib/test/test_locale.py b/Lib/test/test_locale.py index 1523e77..9e264b9 100644 --- a/Lib/test/test_locale.py +++ b/Lib/test/test_locale.py @@ -20,14 +20,14 @@ for tloc in tlocs: else: raise ImportError, "test locale not supported (tried %s)"%(', '.join(tlocs)) -def testformat(formatstr, value, grouping = 0, output=None): +def testformat(formatstr, value, grouping = 0, output=None, func=locale.format): if verbose: if output: print "%s %% %s =? %s ..." %\ (repr(formatstr), repr(value), repr(output)), else: print "%s %% %s works? ..." % (repr(formatstr), repr(value)), - result = locale.format(formatstr, value, grouping = grouping) + result = func(formatstr, value, grouping = grouping) if output and result != output: if verbose: print 'no' @@ -49,6 +49,30 @@ try: testformat("%-10.f", 4200, grouping=1, output='4%s200 ' % sep) # Invoke getpreferredencoding to make sure it does not cause exceptions, locale.getpreferredencoding() + + # === Test format() with more complex formatting strings + # test if grouping is independent from other characters in formatting string + testformat("One million is %i", 1000000, grouping=1, + output='One million is 1%s000%s000' % (sep, sep), + func=locale.format_string) + testformat("One million is %i", 1000000, grouping=1, + output='One million is 1%s000%s000' % (sep, sep), + func=locale.format_string) + # test dots in formatting string + testformat(".%f.", 1000.0, output='.1000.000000.', func=locale.format_string) + # test floats + testformat("--> %10.2f", 1000.0, grouping=1, output='--> 1%s000.00' % sep, + func=locale.format_string) + # test asterisk formats + testformat("%10.*f", (2, 1000.0), grouping=0, output=' 1000.00', + func=locale.format_string) + testformat("%*.*f", (10, 2, 1000.0), grouping=1, output=' 1%s000.00' % sep, + func=locale.format_string) + # test more-in-one + testformat("int %i float %.2f str %s", (1000, 1000.0, 'str'), grouping=1, + output='int 1%s000 float 1%s000.00 str str' % (sep, sep), + func=locale.format_string) + finally: locale.setlocale(locale.LC_NUMERIC, oldlocale) diff --git a/Lib/test/test_logging.py b/Lib/test/test_logging.py index b689dc8..73f8288 100644 --- a/Lib/test/test_logging.py +++ b/Lib/test/test_logging.py @@ -28,6 +28,7 @@ import select import os, sys, string, struct, types, cPickle, cStringIO import socket, tempfile, threading, time import logging, logging.handlers, logging.config +from test.test_support import run_with_locale BANNER = "-- %-10s %-6s ---------------------------------------------------\n" @@ -657,19 +658,11 @@ def test_main_inner(): pass rootLogger.removeHandler(hdlr) +# Set the locale to the platform-dependent default. I have no idea +# why the test does this, but in any case we save the current locale +# first and restore it at the end. +@run_with_locale('LC_ALL', '') def test_main(): - import locale - # Set the locale to the platform-dependent default. I have no idea - # why the test does this, but in any case we save the current locale - # first so we can restore it at the end. - try: - original_locale = locale.setlocale(locale.LC_ALL) - locale.setlocale(locale.LC_ALL, '') - except (ValueError, locale.Error): - # this happens on a Solaris box which only supports "C" locale - # or a Mac OS X box which supports very little locale stuff at all - original_locale = None - # Save and restore the original root logger level across the tests. # Otherwise, e.g., if any test using cookielib runs after test_logging, # cookielib's debug-level logger tries to log messages, leading to @@ -681,8 +674,6 @@ def test_main(): try: test_main_inner() finally: - if original_locale is not None: - locale.setlocale(locale.LC_ALL, original_locale) root_logger.setLevel(original_logging_level) if __name__ == "__main__": diff --git a/Lib/test/test_mailbox.py b/Lib/test/test_mailbox.py index 77d39a6..914a20c 100644 --- a/Lib/test/test_mailbox.py +++ b/Lib/test/test_mailbox.py @@ -1,15 +1,1576 @@ -import mailbox import os import time -import unittest +import stat +import socket +import email +import email.Message +import rfc822 +import re +import StringIO from test import test_support - -# cleanup earlier tests +import unittest +import mailbox +import glob try: - os.unlink(test_support.TESTFN) -except os.error: + import fcntl +except ImportError: pass + +class TestBase(unittest.TestCase): + + def _check_sample(self, msg): + # Inspect a mailbox.Message representation of the sample message + self.assert_(isinstance(msg, email.Message.Message)) + self.assert_(isinstance(msg, mailbox.Message)) + for key, value in _sample_headers.iteritems(): + self.assert_(value in msg.get_all(key)) + self.assert_(msg.is_multipart()) + self.assert_(len(msg.get_payload()) == len(_sample_payloads)) + for i, payload in enumerate(_sample_payloads): + part = msg.get_payload(i) + self.assert_(isinstance(part, email.Message.Message)) + self.assert_(not isinstance(part, mailbox.Message)) + self.assert_(part.get_payload() == payload) + + def _delete_recursively(self, target): + # Delete a file or delete a directory recursively + if os.path.isdir(target): + for path, dirs, files in os.walk(target, topdown=False): + for name in files: + os.remove(os.path.join(path, name)) + for name in dirs: + os.rmdir(os.path.join(path, name)) + os.rmdir(target) + elif os.path.exists(target): + os.remove(target) + + +class TestMailbox(TestBase): + + _factory = None # Overridden by subclasses to reuse tests + _template = 'From: foo\n\n%s' + + def setUp(self): + self._path = test_support.TESTFN + self._box = self._factory(self._path) + + def tearDown(self): + self._box.close() + self._delete_recursively(self._path) + + def test_add(self): + # Add copies of a sample message + keys = [] + keys.append(self._box.add(self._template % 0)) + self.assert_(len(self._box) == 1) + keys.append(self._box.add(mailbox.Message(_sample_message))) + self.assert_(len(self._box) == 2) + keys.append(self._box.add(email.message_from_string(_sample_message))) + self.assert_(len(self._box) == 3) + keys.append(self._box.add(StringIO.StringIO(_sample_message))) + self.assert_(len(self._box) == 4) + keys.append(self._box.add(_sample_message)) + self.assert_(len(self._box) == 5) + self.assert_(self._box.get_string(keys[0]) == self._template % 0) + for i in (1, 2, 3, 4): + self._check_sample(self._box[keys[i]]) + + def test_remove(self): + # Remove messages using remove() + self._test_remove_or_delitem(self._box.remove) + + def test_delitem(self): + # Remove messages using __delitem__() + self._test_remove_or_delitem(self._box.__delitem__) + + def _test_remove_or_delitem(self, method): + # (Used by test_remove() and test_delitem().) + key0 = self._box.add(self._template % 0) + key1 = self._box.add(self._template % 1) + self.assert_(len(self._box) == 2) + method(key0) + l = len(self._box) + self.assert_(l == 1, "actual l: %s" % l) + self.assertRaises(KeyError, lambda: self._box[key0]) + self.assertRaises(KeyError, lambda: method(key0)) + self.assert_(self._box.get_string(key1) == self._template % 1) + key2 = self._box.add(self._template % 2) + self.assert_(len(self._box) == 2) + method(key2) + l = len(self._box) + self.assert_(l == 1, "actual l: %s" % l) + self.assertRaises(KeyError, lambda: self._box[key2]) + self.assertRaises(KeyError, lambda: method(key2)) + self.assert_(self._box.get_string(key1) == self._template % 1) + method(key1) + self.assert_(len(self._box) == 0) + self.assertRaises(KeyError, lambda: self._box[key1]) + self.assertRaises(KeyError, lambda: method(key1)) + + def test_discard(self, repetitions=10): + # Discard messages + key0 = self._box.add(self._template % 0) + key1 = self._box.add(self._template % 1) + self.assert_(len(self._box) == 2) + self._box.discard(key0) + self.assert_(len(self._box) == 1) + self.assertRaises(KeyError, lambda: self._box[key0]) + self._box.discard(key0) + self.assert_(len(self._box) == 1) + self.assertRaises(KeyError, lambda: self._box[key0]) + + def test_get(self): + # Retrieve messages using get() + key0 = self._box.add(self._template % 0) + msg = self._box.get(key0) + self.assert_(msg['from'] == 'foo') + self.assert_(msg.get_payload() == '0') + self.assert_(self._box.get('foo') is None) + self.assert_(self._box.get('foo', False) is False) + self._box.close() + self._box = self._factory(self._path, factory=rfc822.Message) + key1 = self._box.add(self._template % 1) + msg = self._box.get(key1) + self.assert_(msg['from'] == 'foo') + self.assert_(msg.fp.read() == '1') + + def test_getitem(self): + # Retrieve message using __getitem__() + key0 = self._box.add(self._template % 0) + msg = self._box[key0] + self.assert_(msg['from'] == 'foo') + self.assert_(msg.get_payload() == '0') + self.assertRaises(KeyError, lambda: self._box['foo']) + self._box.discard(key0) + self.assertRaises(KeyError, lambda: self._box[key0]) + + def test_get_message(self): + # Get Message representations of messages + key0 = self._box.add(self._template % 0) + key1 = self._box.add(_sample_message) + msg0 = self._box.get_message(key0) + self.assert_(isinstance(msg0, mailbox.Message)) + self.assert_(msg0['from'] == 'foo') + self.assert_(msg0.get_payload() == '0') + self._check_sample(self._box.get_message(key1)) + + def test_get_string(self): + # Get string representations of messages + key0 = self._box.add(self._template % 0) + key1 = self._box.add(_sample_message) + self.assert_(self._box.get_string(key0) == self._template % 0) + self.assert_(self._box.get_string(key1) == _sample_message) + + def test_get_file(self): + # Get file representations of messages + key0 = self._box.add(self._template % 0) + key1 = self._box.add(_sample_message) + self.assert_(self._box.get_file(key0).read().replace(os.linesep, '\n') + == self._template % 0) + self.assert_(self._box.get_file(key1).read().replace(os.linesep, '\n') + == _sample_message) + + def test_iterkeys(self): + # Get keys using iterkeys() + self._check_iteration(self._box.iterkeys, do_keys=True, do_values=False) + + def test_keys(self): + # Get keys using keys() + self._check_iteration(self._box.keys, do_keys=True, do_values=False) + + def test_itervalues(self): + # Get values using itervalues() + self._check_iteration(self._box.itervalues, do_keys=False, + do_values=True) + + def test_iter(self): + # Get values using __iter__() + self._check_iteration(self._box.__iter__, do_keys=False, + do_values=True) + + def test_values(self): + # Get values using values() + self._check_iteration(self._box.values, do_keys=False, do_values=True) + + def test_iteritems(self): + # Get keys and values using iteritems() + self._check_iteration(self._box.iteritems, do_keys=True, + do_values=True) + + def test_items(self): + # Get keys and values using items() + self._check_iteration(self._box.items, do_keys=True, do_values=True) + + def _check_iteration(self, method, do_keys, do_values, repetitions=10): + for value in method(): + self.fail("Not empty") + keys, values = [], [] + for i in xrange(repetitions): + keys.append(self._box.add(self._template % i)) + values.append(self._template % i) + if do_keys and not do_values: + returned_keys = list(method()) + elif do_values and not do_keys: + returned_values = list(method()) + else: + returned_keys, returned_values = [], [] + for key, value in method(): + returned_keys.append(key) + returned_values.append(value) + if do_keys: + self.assert_(len(keys) == len(returned_keys)) + self.assert_(set(keys) == set(returned_keys)) + if do_values: + count = 0 + for value in returned_values: + self.assert_(value['from'] == 'foo') + self.assert_(int(value.get_payload()) < repetitions) + count += 1 + self.assert_(len(values) == count) + + def test_has_key(self): + # Check existence of keys using has_key() + self._test_has_key_or_contains(self._box.has_key) + + def test_contains(self): + # Check existence of keys using __contains__() + self._test_has_key_or_contains(self._box.__contains__) + + def _test_has_key_or_contains(self, method): + # (Used by test_has_key() and test_contains().) + self.assert_(not method('foo')) + key0 = self._box.add(self._template % 0) + self.assert_(method(key0)) + self.assert_(not method('foo')) + key1 = self._box.add(self._template % 1) + self.assert_(method(key1)) + self.assert_(method(key0)) + self.assert_(not method('foo')) + self._box.remove(key0) + self.assert_(not method(key0)) + self.assert_(method(key1)) + self.assert_(not method('foo')) + self._box.remove(key1) + self.assert_(not method(key1)) + self.assert_(not method(key0)) + self.assert_(not method('foo')) + + def test_len(self, repetitions=10): + # Get message count + keys = [] + for i in xrange(repetitions): + self.assert_(len(self._box) == i) + keys.append(self._box.add(self._template % i)) + self.assert_(len(self._box) == i + 1) + for i in xrange(repetitions): + self.assert_(len(self._box) == repetitions - i) + self._box.remove(keys[i]) + self.assert_(len(self._box) == repetitions - i - 1) + + def test_set_item(self): + # Modify messages using __setitem__() + key0 = self._box.add(self._template % 'original 0') + self.assert_(self._box.get_string(key0) == \ + self._template % 'original 0') + key1 = self._box.add(self._template % 'original 1') + self.assert_(self._box.get_string(key1) == \ + self._template % 'original 1') + self._box[key0] = self._template % 'changed 0' + self.assert_(self._box.get_string(key0) == \ + self._template % 'changed 0') + self._box[key1] = self._template % 'changed 1' + self.assert_(self._box.get_string(key1) == \ + self._template % 'changed 1') + self._box[key0] = _sample_message + self._check_sample(self._box[key0]) + self._box[key1] = self._box[key0] + self._check_sample(self._box[key1]) + self._box[key0] = self._template % 'original 0' + self.assert_(self._box.get_string(key0) == + self._template % 'original 0') + self._check_sample(self._box[key1]) + self.assertRaises(KeyError, + lambda: self._box.__setitem__('foo', 'bar')) + self.assertRaises(KeyError, lambda: self._box['foo']) + self.assert_(len(self._box) == 2) + + def test_clear(self, iterations=10): + # Remove all messages using clear() + keys = [] + for i in xrange(iterations): + self._box.add(self._template % i) + for i, key in enumerate(keys): + self.assert_(self._box.get_string(key) == self._template % i) + self._box.clear() + self.assert_(len(self._box) == 0) + for i, key in enumerate(keys): + self.assertRaises(KeyError, lambda: self._box.get_string(key)) + + def test_pop(self): + # Get and remove a message using pop() + key0 = self._box.add(self._template % 0) + self.assert_(key0 in self._box) + key1 = self._box.add(self._template % 1) + self.assert_(key1 in self._box) + self.assert_(self._box.pop(key0).get_payload() == '0') + self.assert_(key0 not in self._box) + self.assert_(key1 in self._box) + key2 = self._box.add(self._template % 2) + self.assert_(key2 in self._box) + self.assert_(self._box.pop(key2).get_payload() == '2') + self.assert_(key2 not in self._box) + self.assert_(key1 in self._box) + self.assert_(self._box.pop(key1).get_payload() == '1') + self.assert_(key1 not in self._box) + self.assert_(len(self._box) == 0) + + def test_popitem(self, iterations=10): + # Get and remove an arbitrary (key, message) using popitem() + keys = [] + for i in xrange(10): + keys.append(self._box.add(self._template % i)) + seen = [] + for i in xrange(10): + key, msg = self._box.popitem() + self.assert_(key in keys) + self.assert_(key not in seen) + seen.append(key) + self.assert_(int(msg.get_payload()) == keys.index(key)) + self.assert_(len(self._box) == 0) + for key in keys: + self.assertRaises(KeyError, lambda: self._box[key]) + + def test_update(self): + # Modify multiple messages using update() + key0 = self._box.add(self._template % 'original 0') + key1 = self._box.add(self._template % 'original 1') + key2 = self._box.add(self._template % 'original 2') + self._box.update({key0: self._template % 'changed 0', + key2: _sample_message}) + self.assert_(len(self._box) == 3) + self.assert_(self._box.get_string(key0) == + self._template % 'changed 0') + self.assert_(self._box.get_string(key1) == + self._template % 'original 1') + self._check_sample(self._box[key2]) + self._box.update([(key2, self._template % 'changed 2'), + (key1, self._template % 'changed 1'), + (key0, self._template % 'original 0')]) + self.assert_(len(self._box) == 3) + self.assert_(self._box.get_string(key0) == + self._template % 'original 0') + self.assert_(self._box.get_string(key1) == + self._template % 'changed 1') + self.assert_(self._box.get_string(key2) == + self._template % 'changed 2') + self.assertRaises(KeyError, + lambda: self._box.update({'foo': 'bar', + key0: self._template % "changed 0"})) + self.assert_(len(self._box) == 3) + self.assert_(self._box.get_string(key0) == + self._template % "changed 0") + self.assert_(self._box.get_string(key1) == + self._template % "changed 1") + self.assert_(self._box.get_string(key2) == + self._template % "changed 2") + + def test_flush(self): + # Write changes to disk + self._test_flush_or_close(self._box.flush) + + def test_lock_unlock(self): + # Lock and unlock the mailbox + self.assert_(not os.path.exists(self._get_lock_path())) + self._box.lock() + self.assert_(os.path.exists(self._get_lock_path())) + self._box.unlock() + self.assert_(not os.path.exists(self._get_lock_path())) + + def test_close(self): + # Close mailbox and flush changes to disk + self._test_flush_or_close(self._box.close) + + def _test_flush_or_close(self, method): + contents = [self._template % i for i in xrange(3)] + self._box.add(contents[0]) + self._box.add(contents[1]) + self._box.add(contents[2]) + method() + self._box = self._factory(self._path) + keys = self._box.keys() + self.assert_(len(keys) == 3) + for key in keys: + self.assert_(self._box.get_string(key) in contents) + + def test_dump_message(self): + # Write message representations to disk + for input in (email.message_from_string(_sample_message), + _sample_message, StringIO.StringIO(_sample_message)): + output = StringIO.StringIO() + self._box._dump_message(input, output) + self.assert_(output.getvalue() == + _sample_message.replace('\n', os.linesep)) + output = StringIO.StringIO() + self.assertRaises(TypeError, + lambda: self._box._dump_message(None, output)) + + def _get_lock_path(self): + # Return the path of the dot lock file. May be overridden. + return self._path + '.lock' + + +class TestMailboxSuperclass(TestBase): + + def test_notimplemented(self): + # Test that all Mailbox methods raise NotImplementedException. + box = mailbox.Mailbox('path') + self.assertRaises(NotImplementedError, lambda: box.add('')) + self.assertRaises(NotImplementedError, lambda: box.remove('')) + self.assertRaises(NotImplementedError, lambda: box.__delitem__('')) + self.assertRaises(NotImplementedError, lambda: box.discard('')) + self.assertRaises(NotImplementedError, lambda: box.__setitem__('', '')) + self.assertRaises(NotImplementedError, lambda: box.iterkeys()) + self.assertRaises(NotImplementedError, lambda: box.keys()) + self.assertRaises(NotImplementedError, lambda: box.itervalues().next()) + self.assertRaises(NotImplementedError, lambda: box.__iter__().next()) + self.assertRaises(NotImplementedError, lambda: box.values()) + self.assertRaises(NotImplementedError, lambda: box.iteritems().next()) + self.assertRaises(NotImplementedError, lambda: box.items()) + self.assertRaises(NotImplementedError, lambda: box.get('')) + self.assertRaises(NotImplementedError, lambda: box.__getitem__('')) + self.assertRaises(NotImplementedError, lambda: box.get_message('')) + self.assertRaises(NotImplementedError, lambda: box.get_string('')) + self.assertRaises(NotImplementedError, lambda: box.get_file('')) + self.assertRaises(NotImplementedError, lambda: box.has_key('')) + self.assertRaises(NotImplementedError, lambda: box.__contains__('')) + self.assertRaises(NotImplementedError, lambda: box.__len__()) + self.assertRaises(NotImplementedError, lambda: box.clear()) + self.assertRaises(NotImplementedError, lambda: box.pop('')) + self.assertRaises(NotImplementedError, lambda: box.popitem()) + self.assertRaises(NotImplementedError, lambda: box.update((('', ''),))) + self.assertRaises(NotImplementedError, lambda: box.flush()) + self.assertRaises(NotImplementedError, lambda: box.lock()) + self.assertRaises(NotImplementedError, lambda: box.unlock()) + self.assertRaises(NotImplementedError, lambda: box.close()) + + +class TestMaildir(TestMailbox): + + _factory = lambda self, path, factory=None: mailbox.Maildir(path, factory) + + def setUp(self): + TestMailbox.setUp(self) + if os.name == 'nt': + self._box.colon = '!' + + def test_add_MM(self): + # Add a MaildirMessage instance + msg = mailbox.MaildirMessage(self._template % 0) + msg.set_subdir('cur') + msg.set_info('foo') + key = self._box.add(msg) + self.assert_(os.path.exists(os.path.join(self._path, 'cur', '%s%sfoo' % + (key, self._box.colon)))) + + def test_get_MM(self): + # Get a MaildirMessage instance + msg = mailbox.MaildirMessage(self._template % 0) + msg.set_subdir('cur') + msg.set_flags('RF') + key = self._box.add(msg) + msg_returned = self._box.get_message(key) + self.assert_(isinstance(msg_returned, mailbox.MaildirMessage)) + self.assert_(msg_returned.get_subdir() == 'cur') + self.assert_(msg_returned.get_flags() == 'FR') + + def test_set_MM(self): + # Set with a MaildirMessage instance + msg0 = mailbox.MaildirMessage(self._template % 0) + msg0.set_flags('TP') + key = self._box.add(msg0) + msg_returned = self._box.get_message(key) + self.assert_(msg_returned.get_subdir() == 'new') + self.assert_(msg_returned.get_flags() == 'PT') + msg1 = mailbox.MaildirMessage(self._template % 1) + self._box[key] = msg1 + msg_returned = self._box.get_message(key) + self.assert_(msg_returned.get_subdir() == 'new') + self.assert_(msg_returned.get_flags() == '') + self.assert_(msg_returned.get_payload() == '1') + msg2 = mailbox.MaildirMessage(self._template % 2) + msg2.set_info('2,S') + self._box[key] = msg2 + self._box[key] = self._template % 3 + msg_returned = self._box.get_message(key) + self.assert_(msg_returned.get_subdir() == 'new') + self.assert_(msg_returned.get_flags() == 'S') + self.assert_(msg_returned.get_payload() == '3') + + def test_initialize_new(self): + # Initialize a non-existent mailbox + self.tearDown() + self._box = mailbox.Maildir(self._path) + self._check_basics(factory=rfc822.Message) + self._delete_recursively(self._path) + self._box = self._factory(self._path, factory=None) + self._check_basics() + + def test_initialize_existing(self): + # Initialize an existing mailbox + self.tearDown() + for subdir in '', 'tmp', 'new', 'cur': + os.mkdir(os.path.join(self._path, subdir)) + self._box = mailbox.Maildir(self._path) + self._check_basics(factory=rfc822.Message) + self._box = mailbox.Maildir(self._path, factory=None) + self._check_basics() + + def _check_basics(self, factory=None): + # (Used by test_open_new() and test_open_existing().) + self.assertEqual(self._box._path, os.path.abspath(self._path)) + self.assertEqual(self._box._factory, factory) + for subdir in '', 'tmp', 'new', 'cur': + path = os.path.join(self._path, subdir) + mode = os.stat(path)[stat.ST_MODE] + self.assert_(stat.S_ISDIR(mode), "Not a directory: '%s'" % path) + + def test_list_folders(self): + # List folders + self._box.add_folder('one') + self._box.add_folder('two') + self._box.add_folder('three') + self.assert_(len(self._box.list_folders()) == 3) + self.assert_(set(self._box.list_folders()) == + set(('one', 'two', 'three'))) + + def test_get_folder(self): + # Open folders + self._box.add_folder('foo.bar') + folder0 = self._box.get_folder('foo.bar') + folder0.add(self._template % 'bar') + self.assert_(os.path.isdir(os.path.join(self._path, '.foo.bar'))) + folder1 = self._box.get_folder('foo.bar') + self.assert_(folder1.get_string(folder1.keys()[0]) == \ + self._template % 'bar') + + def test_add_and_remove_folders(self): + # Delete folders + self._box.add_folder('one') + self._box.add_folder('two') + self.assert_(len(self._box.list_folders()) == 2) + self.assert_(set(self._box.list_folders()) == set(('one', 'two'))) + self._box.remove_folder('one') + self.assert_(len(self._box.list_folders()) == 1) + self.assert_(set(self._box.list_folders()) == set(('two',))) + self._box.add_folder('three') + self.assert_(len(self._box.list_folders()) == 2) + self.assert_(set(self._box.list_folders()) == set(('two', 'three'))) + self._box.remove_folder('three') + self.assert_(len(self._box.list_folders()) == 1) + self.assert_(set(self._box.list_folders()) == set(('two',))) + self._box.remove_folder('two') + self.assert_(len(self._box.list_folders()) == 0) + self.assert_(self._box.list_folders() == []) + + def test_clean(self): + # Remove old files from 'tmp' + foo_path = os.path.join(self._path, 'tmp', 'foo') + bar_path = os.path.join(self._path, 'tmp', 'bar') + f = open(foo_path, 'w') + f.write("@") + f.close() + f = open(bar_path, 'w') + f.write("@") + f.close() + self._box.clean() + self.assert_(os.path.exists(foo_path)) + self.assert_(os.path.exists(bar_path)) + foo_stat = os.stat(foo_path) + os.utime(foo_path, (time.time() - 129600 - 2, + foo_stat.st_mtime)) + self._box.clean() + self.assert_(not os.path.exists(foo_path)) + self.assert_(os.path.exists(bar_path)) + + def test_create_tmp(self, repetitions=10): + # Create files in tmp directory + hostname = socket.gethostname() + if '/' in hostname: + hostname = hostname.replace('/', r'\057') + if ':' in hostname: + hostname = hostname.replace(':', r'\072') + pid = os.getpid() + pattern = re.compile(r"(?P<time>\d+)\.M(?P<M>\d{1,6})P(?P<P>\d+)" + r"Q(?P<Q>\d+)\.(?P<host>[^:/]+)") + previous_groups = None + for x in xrange(repetitions): + tmp_file = self._box._create_tmp() + head, tail = os.path.split(tmp_file.name) + self.assertEqual(head, os.path.abspath(os.path.join(self._path, + "tmp")), + "File in wrong location: '%s'" % head) + match = pattern.match(tail) + self.assert_(match != None, "Invalid file name: '%s'" % tail) + groups = match.groups() + if previous_groups != None: + self.assert_(int(groups[0] >= previous_groups[0]), + "Non-monotonic seconds: '%s' before '%s'" % + (previous_groups[0], groups[0])) + self.assert_(int(groups[1] >= previous_groups[1]) or + groups[0] != groups[1], + "Non-monotonic milliseconds: '%s' before '%s'" % + (previous_groups[1], groups[1])) + self.assert_(int(groups[2]) == pid, + "Process ID mismatch: '%s' should be '%s'" % + (groups[2], pid)) + self.assert_(int(groups[3]) == int(previous_groups[3]) + 1, + "Non-sequential counter: '%s' before '%s'" % + (previous_groups[3], groups[3])) + self.assert_(groups[4] == hostname, + "Host name mismatch: '%s' should be '%s'" % + (groups[4], hostname)) + previous_groups = groups + tmp_file.write(_sample_message) + tmp_file.seek(0) + self.assert_(tmp_file.read() == _sample_message) + tmp_file.close() + file_count = len(os.listdir(os.path.join(self._path, "tmp"))) + self.assert_(file_count == repetitions, + "Wrong file count: '%s' should be '%s'" % + (file_count, repetitions)) + + def test_refresh(self): + # Update the table of contents + self.assert_(self._box._toc == {}) + key0 = self._box.add(self._template % 0) + key1 = self._box.add(self._template % 1) + self.assert_(self._box._toc == {}) + self._box._refresh() + self.assert_(self._box._toc == {key0: os.path.join('new', key0), + key1: os.path.join('new', key1)}) + key2 = self._box.add(self._template % 2) + self.assert_(self._box._toc == {key0: os.path.join('new', key0), + key1: os.path.join('new', key1)}) + self._box._refresh() + self.assert_(self._box._toc == {key0: os.path.join('new', key0), + key1: os.path.join('new', key1), + key2: os.path.join('new', key2)}) + + def test_lookup(self): + # Look up message subpaths in the TOC + self.assertRaises(KeyError, lambda: self._box._lookup('foo')) + key0 = self._box.add(self._template % 0) + self.assert_(self._box._lookup(key0) == os.path.join('new', key0)) + os.remove(os.path.join(self._path, 'new', key0)) + self.assert_(self._box._toc == {key0: os.path.join('new', key0)}) + self.assertRaises(KeyError, lambda: self._box._lookup(key0)) + self.assert_(self._box._toc == {}) + + def test_lock_unlock(self): + # Lock and unlock the mailbox. For Maildir, this does nothing. + self._box.lock() + self._box.unlock() + + +class _TestMboxMMDF(TestMailbox): + + def tearDown(self): + self._box.close() + self._delete_recursively(self._path) + for lock_remnant in glob.glob(self._path + '.*'): + os.remove(lock_remnant) + + def test_add_from_string(self): + # Add a string starting with 'From ' to the mailbox + key = self._box.add('From foo@bar blah\nFrom: foo\n\n0') + self.assert_(self._box[key].get_from() == 'foo@bar blah') + self.assert_(self._box[key].get_payload() == '0') + + def test_add_mbox_or_mmdf_message(self): + # Add an mboxMessage or MMDFMessage + for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage): + msg = class_('From foo@bar blah\nFrom: foo\n\n0') + key = self._box.add(msg) + + def test_open_close_open(self): + # Open and inspect previously-created mailbox + values = [self._template % i for i in xrange(3)] + for value in values: + self._box.add(value) + self._box.close() + mtime = os.path.getmtime(self._path) + self._box = self._factory(self._path) + self.assert_(len(self._box) == 3) + for key in self._box.iterkeys(): + self.assert_(self._box.get_string(key) in values) + self._box.close() + self.assert_(mtime == os.path.getmtime(self._path)) + + def test_add_and_close(self): + # Verifying that closing a mailbox doesn't change added items + self._box.add(_sample_message) + for i in xrange(3): + self._box.add(self._template % i) + self._box.add(_sample_message) + self._box._file.flush() + self._box._file.seek(0) + contents = self._box._file.read() + self._box.close() + self.assert_(contents == open(self._path, 'rb').read()) + self._box = self._factory(self._path) + + +class TestMbox(_TestMboxMMDF): + + _factory = lambda self, path, factory=None: mailbox.mbox(path, factory) + + +class TestMMDF(_TestMboxMMDF): + + _factory = lambda self, path, factory=None: mailbox.MMDF(path, factory) + + +class TestMH(TestMailbox): + + _factory = lambda self, path, factory=None: mailbox.MH(path, factory) + + def test_list_folders(self): + # List folders + self._box.add_folder('one') + self._box.add_folder('two') + self._box.add_folder('three') + self.assert_(len(self._box.list_folders()) == 3) + self.assert_(set(self._box.list_folders()) == + set(('one', 'two', 'three'))) + + def test_get_folder(self): + # Open folders + self._box.add_folder('foo.bar') + folder0 = self._box.get_folder('foo.bar') + folder0.add(self._template % 'bar') + self.assert_(os.path.isdir(os.path.join(self._path, 'foo.bar'))) + folder1 = self._box.get_folder('foo.bar') + self.assert_(folder1.get_string(folder1.keys()[0]) == \ + self._template % 'bar') + + def test_add_and_remove_folders(self): + # Delete folders + self._box.add_folder('one') + self._box.add_folder('two') + self.assert_(len(self._box.list_folders()) == 2) + self.assert_(set(self._box.list_folders()) == set(('one', 'two'))) + self._box.remove_folder('one') + self.assert_(len(self._box.list_folders()) == 1) + self.assert_(set(self._box.list_folders()) == set(('two',))) + self._box.add_folder('three') + self.assert_(len(self._box.list_folders()) == 2) + self.assert_(set(self._box.list_folders()) == set(('two', 'three'))) + self._box.remove_folder('three') + self.assert_(len(self._box.list_folders()) == 1) + self.assert_(set(self._box.list_folders()) == set(('two',))) + self._box.remove_folder('two') + self.assert_(len(self._box.list_folders()) == 0) + self.assert_(self._box.list_folders() == []) + + def test_sequences(self): + # Get and set sequences + self.assert_(self._box.get_sequences() == {}) + msg0 = mailbox.MHMessage(self._template % 0) + msg0.add_sequence('foo') + key0 = self._box.add(msg0) + self.assert_(self._box.get_sequences() == {'foo':[key0]}) + msg1 = mailbox.MHMessage(self._template % 1) + msg1.set_sequences(['bar', 'replied', 'foo']) + key1 = self._box.add(msg1) + self.assert_(self._box.get_sequences() == + {'foo':[key0, key1], 'bar':[key1], 'replied':[key1]}) + msg0.set_sequences(['flagged']) + self._box[key0] = msg0 + self.assert_(self._box.get_sequences() == + {'foo':[key1], 'bar':[key1], 'replied':[key1], + 'flagged':[key0]}) + self._box.remove(key1) + self.assert_(self._box.get_sequences() == {'flagged':[key0]}) + + def test_pack(self): + # Pack the contents of the mailbox + msg0 = mailbox.MHMessage(self._template % 0) + msg1 = mailbox.MHMessage(self._template % 1) + msg2 = mailbox.MHMessage(self._template % 2) + msg3 = mailbox.MHMessage(self._template % 3) + msg0.set_sequences(['foo', 'unseen']) + msg1.set_sequences(['foo']) + msg2.set_sequences(['foo', 'flagged']) + msg3.set_sequences(['foo', 'bar', 'replied']) + key0 = self._box.add(msg0) + key1 = self._box.add(msg1) + key2 = self._box.add(msg2) + key3 = self._box.add(msg3) + self.assert_(self._box.get_sequences() == + {'foo':[key0,key1,key2,key3], 'unseen':[key0], + 'flagged':[key2], 'bar':[key3], 'replied':[key3]}) + self._box.remove(key2) + self.assert_(self._box.get_sequences() == + {'foo':[key0,key1,key3], 'unseen':[key0], 'bar':[key3], + 'replied':[key3]}) + self._box.pack() + self.assert_(self._box.keys() == [1, 2, 3]) + key0 = key0 + key1 = key0 + 1 + key2 = key1 + 1 + self.assert_(self._box.get_sequences() == + {'foo':[1, 2, 3], 'unseen':[1], 'bar':[3], 'replied':[3]}) + + def _get_lock_path(self): + return os.path.join(self._path, '.mh_sequences.lock') + + +class TestBabyl(TestMailbox): + + _factory = lambda self, path, factory=None: mailbox.Babyl(path, factory) + + def tearDown(self): + self._box.close() + self._delete_recursively(self._path) + for lock_remnant in glob.glob(self._path + '.*'): + os.remove(lock_remnant) + + def test_labels(self): + # Get labels from the mailbox + self.assert_(self._box.get_labels() == []) + msg0 = mailbox.BabylMessage(self._template % 0) + msg0.add_label('foo') + key0 = self._box.add(msg0) + self.assert_(self._box.get_labels() == ['foo']) + msg1 = mailbox.BabylMessage(self._template % 1) + msg1.set_labels(['bar', 'answered', 'foo']) + key1 = self._box.add(msg1) + self.assert_(set(self._box.get_labels()) == set(['foo', 'bar'])) + msg0.set_labels(['blah', 'filed']) + self._box[key0] = msg0 + self.assert_(set(self._box.get_labels()) == + set(['foo', 'bar', 'blah'])) + self._box.remove(key1) + self.assert_(set(self._box.get_labels()) == set(['blah'])) + + +class TestMessage(TestBase): + + _factory = mailbox.Message # Overridden by subclasses to reuse tests + + def setUp(self): + self._path = test_support.TESTFN + + def tearDown(self): + self._delete_recursively(self._path) + + def test_initialize_with_eMM(self): + # Initialize based on email.Message.Message instance + eMM = email.message_from_string(_sample_message) + msg = self._factory(eMM) + self._post_initialize_hook(msg) + self._check_sample(msg) + + def test_initialize_with_string(self): + # Initialize based on string + msg = self._factory(_sample_message) + self._post_initialize_hook(msg) + self._check_sample(msg) + + def test_initialize_with_file(self): + # Initialize based on contents of file + f = open(self._path, 'w+') + f.write(_sample_message) + f.seek(0) + msg = self._factory(f) + self._post_initialize_hook(msg) + self._check_sample(msg) + f.close() + + def test_initialize_with_nothing(self): + # Initialize without arguments + msg = self._factory() + self._post_initialize_hook(msg) + self.assert_(isinstance(msg, email.Message.Message)) + self.assert_(isinstance(msg, mailbox.Message)) + self.assert_(isinstance(msg, self._factory)) + self.assert_(msg.keys() == []) + self.assert_(not msg.is_multipart()) + self.assert_(msg.get_payload() == None) + + def test_initialize_incorrectly(self): + # Initialize with invalid argument + self.assertRaises(TypeError, lambda: self._factory(object())) + + def test_become_message(self): + # Take on the state of another message + eMM = email.message_from_string(_sample_message) + msg = self._factory() + msg._become_message(eMM) + self._check_sample(msg) + + def test_explain_to(self): + # Copy self's format-specific data to other message formats. + # This test is superficial; better ones are in TestMessageConversion. + msg = self._factory() + for class_ in (mailbox.Message, mailbox.MaildirMessage, + mailbox.mboxMessage, mailbox.MHMessage, + mailbox.BabylMessage, mailbox.MMDFMessage): + other_msg = class_() + msg._explain_to(other_msg) + other_msg = email.Message.Message() + self.assertRaises(TypeError, lambda: msg._explain_to(other_msg)) + + def _post_initialize_hook(self, msg): + # Overridden by subclasses to check extra things after initialization + pass + + +class TestMaildirMessage(TestMessage): + + _factory = mailbox.MaildirMessage + + def _post_initialize_hook(self, msg): + self.assert_(msg._subdir == 'new') + self.assert_(msg._info == '') + + def test_subdir(self): + # Use get_subdir() and set_subdir() + msg = mailbox.MaildirMessage(_sample_message) + self.assert_(msg.get_subdir() == 'new') + msg.set_subdir('cur') + self.assert_(msg.get_subdir() == 'cur') + msg.set_subdir('new') + self.assert_(msg.get_subdir() == 'new') + self.assertRaises(ValueError, lambda: msg.set_subdir('tmp')) + self.assert_(msg.get_subdir() == 'new') + msg.set_subdir('new') + self.assert_(msg.get_subdir() == 'new') + self._check_sample(msg) + + def test_flags(self): + # Use get_flags(), set_flags(), add_flag(), remove_flag() + msg = mailbox.MaildirMessage(_sample_message) + self.assert_(msg.get_flags() == '') + self.assert_(msg.get_subdir() == 'new') + msg.set_flags('F') + self.assert_(msg.get_subdir() == 'new') + self.assert_(msg.get_flags() == 'F') + msg.set_flags('SDTP') + self.assert_(msg.get_flags() == 'DPST') + msg.add_flag('FT') + self.assert_(msg.get_flags() == 'DFPST') + msg.remove_flag('TDRP') + self.assert_(msg.get_flags() == 'FS') + self.assert_(msg.get_subdir() == 'new') + self._check_sample(msg) + + def test_date(self): + # Use get_date() and set_date() + msg = mailbox.MaildirMessage(_sample_message) + self.assert_(abs(msg.get_date() - time.time()) < 60) + msg.set_date(0.0) + self.assert_(msg.get_date() == 0.0) + + def test_info(self): + # Use get_info() and set_info() + msg = mailbox.MaildirMessage(_sample_message) + self.assert_(msg.get_info() == '') + msg.set_info('1,foo=bar') + self.assert_(msg.get_info() == '1,foo=bar') + self.assertRaises(TypeError, lambda: msg.set_info(None)) + self._check_sample(msg) + + def test_info_and_flags(self): + # Test interaction of info and flag methods + msg = mailbox.MaildirMessage(_sample_message) + self.assert_(msg.get_info() == '') + msg.set_flags('SF') + self.assert_(msg.get_flags() == 'FS') + self.assert_(msg.get_info() == '2,FS') + msg.set_info('1,') + self.assert_(msg.get_flags() == '') + self.assert_(msg.get_info() == '1,') + msg.remove_flag('RPT') + self.assert_(msg.get_flags() == '') + self.assert_(msg.get_info() == '1,') + msg.add_flag('D') + self.assert_(msg.get_flags() == 'D') + self.assert_(msg.get_info() == '2,D') + self._check_sample(msg) + + +class _TestMboxMMDFMessage(TestMessage): + + _factory = mailbox._mboxMMDFMessage + + def _post_initialize_hook(self, msg): + self._check_from(msg) + + def test_initialize_with_unixfrom(self): + # Initialize with a message that already has a _unixfrom attribute + msg = mailbox.Message(_sample_message) + msg.set_unixfrom('From foo@bar blah') + msg = mailbox.mboxMessage(msg) + self.assert_(msg.get_from() == 'foo@bar blah', msg.get_from()) + + def test_from(self): + # Get and set "From " line + msg = mailbox.mboxMessage(_sample_message) + self._check_from(msg) + msg.set_from('foo bar') + self.assert_(msg.get_from() == 'foo bar') + msg.set_from('foo@bar', True) + self._check_from(msg, 'foo@bar') + msg.set_from('blah@temp', time.localtime()) + self._check_from(msg, 'blah@temp') + + def test_flags(self): + # Use get_flags(), set_flags(), add_flag(), remove_flag() + msg = mailbox.mboxMessage(_sample_message) + self.assert_(msg.get_flags() == '') + msg.set_flags('F') + self.assert_(msg.get_flags() == 'F') + msg.set_flags('XODR') + self.assert_(msg.get_flags() == 'RODX') + msg.add_flag('FA') + self.assert_(msg.get_flags() == 'RODFAX') + msg.remove_flag('FDXA') + self.assert_(msg.get_flags() == 'RO') + self._check_sample(msg) + + def _check_from(self, msg, sender=None): + # Check contents of "From " line + if sender is None: + sender = "MAILER-DAEMON" + self.assert_(re.match(sender + r" \w{3} \w{3} [\d ]\d [\d ]\d:\d{2}:" + r"\d{2} \d{4}", msg.get_from()) is not None) + + +class TestMboxMessage(_TestMboxMMDFMessage): + + _factory = mailbox.mboxMessage + + +class TestMHMessage(TestMessage): + + _factory = mailbox.MHMessage + + def _post_initialize_hook(self, msg): + self.assert_(msg._sequences == []) + + def test_sequences(self): + # Get, set, join, and leave sequences + msg = mailbox.MHMessage(_sample_message) + self.assert_(msg.get_sequences() == []) + msg.set_sequences(['foobar']) + self.assert_(msg.get_sequences() == ['foobar']) + msg.set_sequences([]) + self.assert_(msg.get_sequences() == []) + msg.add_sequence('unseen') + self.assert_(msg.get_sequences() == ['unseen']) + msg.add_sequence('flagged') + self.assert_(msg.get_sequences() == ['unseen', 'flagged']) + msg.add_sequence('flagged') + self.assert_(msg.get_sequences() == ['unseen', 'flagged']) + msg.remove_sequence('unseen') + self.assert_(msg.get_sequences() == ['flagged']) + msg.add_sequence('foobar') + self.assert_(msg.get_sequences() == ['flagged', 'foobar']) + msg.remove_sequence('replied') + self.assert_(msg.get_sequences() == ['flagged', 'foobar']) + msg.set_sequences(['foobar', 'replied']) + self.assert_(msg.get_sequences() == ['foobar', 'replied']) + + +class TestBabylMessage(TestMessage): + + _factory = mailbox.BabylMessage + + def _post_initialize_hook(self, msg): + self.assert_(msg._labels == []) + + def test_labels(self): + # Get, set, join, and leave labels + msg = mailbox.BabylMessage(_sample_message) + self.assert_(msg.get_labels() == []) + msg.set_labels(['foobar']) + self.assert_(msg.get_labels() == ['foobar']) + msg.set_labels([]) + self.assert_(msg.get_labels() == []) + msg.add_label('filed') + self.assert_(msg.get_labels() == ['filed']) + msg.add_label('resent') + self.assert_(msg.get_labels() == ['filed', 'resent']) + msg.add_label('resent') + self.assert_(msg.get_labels() == ['filed', 'resent']) + msg.remove_label('filed') + self.assert_(msg.get_labels() == ['resent']) + msg.add_label('foobar') + self.assert_(msg.get_labels() == ['resent', 'foobar']) + msg.remove_label('unseen') + self.assert_(msg.get_labels() == ['resent', 'foobar']) + msg.set_labels(['foobar', 'answered']) + self.assert_(msg.get_labels() == ['foobar', 'answered']) + + def test_visible(self): + # Get, set, and update visible headers + msg = mailbox.BabylMessage(_sample_message) + visible = msg.get_visible() + self.assert_(visible.keys() == []) + self.assert_(visible.get_payload() is None) + visible['User-Agent'] = 'FooBar 1.0' + visible['X-Whatever'] = 'Blah' + self.assert_(msg.get_visible().keys() == []) + msg.set_visible(visible) + visible = msg.get_visible() + self.assert_(visible.keys() == ['User-Agent', 'X-Whatever']) + self.assert_(visible['User-Agent'] == 'FooBar 1.0') + self.assert_(visible['X-Whatever'] == 'Blah') + self.assert_(visible.get_payload() is None) + msg.update_visible() + self.assert_(visible.keys() == ['User-Agent', 'X-Whatever']) + self.assert_(visible.get_payload() is None) + visible = msg.get_visible() + self.assert_(visible.keys() == ['User-Agent', 'Date', 'From', 'To', + 'Subject']) + for header in ('User-Agent', 'Date', 'From', 'To', 'Subject'): + self.assert_(visible[header] == msg[header]) + + +class TestMMDFMessage(_TestMboxMMDFMessage): + + _factory = mailbox.MMDFMessage + + +class TestMessageConversion(TestBase): + + def test_plain_to_x(self): + # Convert Message to all formats + for class_ in (mailbox.Message, mailbox.MaildirMessage, + mailbox.mboxMessage, mailbox.MHMessage, + mailbox.BabylMessage, mailbox.MMDFMessage): + msg_plain = mailbox.Message(_sample_message) + msg = class_(msg_plain) + self._check_sample(msg) + + def test_x_to_plain(self): + # Convert all formats to Message + for class_ in (mailbox.Message, mailbox.MaildirMessage, + mailbox.mboxMessage, mailbox.MHMessage, + mailbox.BabylMessage, mailbox.MMDFMessage): + msg = class_(_sample_message) + msg_plain = mailbox.Message(msg) + self._check_sample(msg_plain) + + def test_x_to_invalid(self): + # Convert all formats to an invalid format + for class_ in (mailbox.Message, mailbox.MaildirMessage, + mailbox.mboxMessage, mailbox.MHMessage, + mailbox.BabylMessage, mailbox.MMDFMessage): + self.assertRaises(TypeError, lambda: class_(False)) + + def test_maildir_to_maildir(self): + # Convert MaildirMessage to MaildirMessage + msg_maildir = mailbox.MaildirMessage(_sample_message) + msg_maildir.set_flags('DFPRST') + msg_maildir.set_subdir('cur') + date = msg_maildir.get_date() + msg = mailbox.MaildirMessage(msg_maildir) + self._check_sample(msg) + self.assert_(msg.get_flags() == 'DFPRST') + self.assert_(msg.get_subdir() == 'cur') + self.assert_(msg.get_date() == date) + + def test_maildir_to_mboxmmdf(self): + # Convert MaildirMessage to mboxmessage and MMDFMessage + pairs = (('D', ''), ('F', 'F'), ('P', ''), ('R', 'A'), ('S', 'R'), + ('T', 'D'), ('DFPRST', 'RDFA')) + for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage): + msg_maildir = mailbox.MaildirMessage(_sample_message) + msg_maildir.set_date(0.0) + for setting, result in pairs: + msg_maildir.set_flags(setting) + msg = class_(msg_maildir) + self.assert_(msg.get_flags() == result) + self.assert_(msg.get_from() == 'MAILER-DAEMON %s' % + time.asctime(time.gmtime(0.0))) + msg_maildir.set_subdir('cur') + self.assert_(class_(msg_maildir).get_flags() == 'RODFA') + + def test_maildir_to_mh(self): + # Convert MaildirMessage to MHMessage + msg_maildir = mailbox.MaildirMessage(_sample_message) + pairs = (('D', ['unseen']), ('F', ['unseen', 'flagged']), + ('P', ['unseen']), ('R', ['unseen', 'replied']), ('S', []), + ('T', ['unseen']), ('DFPRST', ['replied', 'flagged'])) + for setting, result in pairs: + msg_maildir.set_flags(setting) + self.assert_(mailbox.MHMessage(msg_maildir).get_sequences() == \ + result) + + def test_maildir_to_babyl(self): + # Convert MaildirMessage to Babyl + msg_maildir = mailbox.MaildirMessage(_sample_message) + pairs = (('D', ['unseen']), ('F', ['unseen']), + ('P', ['unseen', 'forwarded']), ('R', ['unseen', 'answered']), + ('S', []), ('T', ['unseen', 'deleted']), + ('DFPRST', ['deleted', 'answered', 'forwarded'])) + for setting, result in pairs: + msg_maildir.set_flags(setting) + self.assert_(mailbox.BabylMessage(msg_maildir).get_labels() == \ + result) + + def test_mboxmmdf_to_maildir(self): + # Convert mboxMessage and MMDFMessage to MaildirMessage + for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage): + msg_mboxMMDF = class_(_sample_message) + msg_mboxMMDF.set_from('foo@bar', time.gmtime(0.0)) + pairs = (('R', 'S'), ('O', ''), ('D', 'T'), ('F', 'F'), ('A', 'R'), + ('RODFA', 'FRST')) + for setting, result in pairs: + msg_mboxMMDF.set_flags(setting) + msg = mailbox.MaildirMessage(msg_mboxMMDF) + self.assert_(msg.get_flags() == result) + self.assert_(msg.get_date() == 0.0, msg.get_date()) + msg_mboxMMDF.set_flags('O') + self.assert_(mailbox.MaildirMessage(msg_mboxMMDF).get_subdir() == \ + 'cur') + + def test_mboxmmdf_to_mboxmmdf(self): + # Convert mboxMessage and MMDFMessage to mboxMessage and MMDFMessage + for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage): + msg_mboxMMDF = class_(_sample_message) + msg_mboxMMDF.set_flags('RODFA') + msg_mboxMMDF.set_from('foo@bar') + for class2_ in (mailbox.mboxMessage, mailbox.MMDFMessage): + msg2 = class2_(msg_mboxMMDF) + self.assert_(msg2.get_flags() == 'RODFA') + self.assert_(msg2.get_from() == 'foo@bar') + + def test_mboxmmdf_to_mh(self): + # Convert mboxMessage and MMDFMessage to MHMessage + for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage): + msg_mboxMMDF = class_(_sample_message) + pairs = (('R', []), ('O', ['unseen']), ('D', ['unseen']), + ('F', ['unseen', 'flagged']), + ('A', ['unseen', 'replied']), + ('RODFA', ['replied', 'flagged'])) + for setting, result in pairs: + msg_mboxMMDF.set_flags(setting) + self.assert_(mailbox.MHMessage(msg_mboxMMDF).get_sequences() \ + == result) + + def test_mboxmmdf_to_babyl(self): + # Convert mboxMessage and MMDFMessage to BabylMessage + for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage): + msg = class_(_sample_message) + pairs = (('R', []), ('O', ['unseen']), + ('D', ['unseen', 'deleted']), ('F', ['unseen']), + ('A', ['unseen', 'answered']), + ('RODFA', ['deleted', 'answered'])) + for setting, result in pairs: + msg.set_flags(setting) + self.assert_(mailbox.BabylMessage(msg).get_labels() == result) + + def test_mh_to_maildir(self): + # Convert MHMessage to MaildirMessage + pairs = (('unseen', ''), ('replied', 'RS'), ('flagged', 'FS')) + for setting, result in pairs: + msg = mailbox.MHMessage(_sample_message) + msg.add_sequence(setting) + self.assert_(mailbox.MaildirMessage(msg).get_flags() == result) + self.assert_(mailbox.MaildirMessage(msg).get_subdir() == 'cur') + msg = mailbox.MHMessage(_sample_message) + msg.add_sequence('unseen') + msg.add_sequence('replied') + msg.add_sequence('flagged') + self.assert_(mailbox.MaildirMessage(msg).get_flags() == 'FR') + self.assert_(mailbox.MaildirMessage(msg).get_subdir() == 'cur') + + def test_mh_to_mboxmmdf(self): + # Convert MHMessage to mboxMessage and MMDFMessage + pairs = (('unseen', 'O'), ('replied', 'ROA'), ('flagged', 'ROF')) + for setting, result in pairs: + msg = mailbox.MHMessage(_sample_message) + msg.add_sequence(setting) + for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage): + self.assert_(class_(msg).get_flags() == result) + msg = mailbox.MHMessage(_sample_message) + msg.add_sequence('unseen') + msg.add_sequence('replied') + msg.add_sequence('flagged') + for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage): + self.assert_(class_(msg).get_flags() == 'OFA') + + def test_mh_to_mh(self): + # Convert MHMessage to MHMessage + msg = mailbox.MHMessage(_sample_message) + msg.add_sequence('unseen') + msg.add_sequence('replied') + msg.add_sequence('flagged') + self.assert_(mailbox.MHMessage(msg).get_sequences() == \ + ['unseen', 'replied', 'flagged']) + + def test_mh_to_babyl(self): + # Convert MHMessage to BabylMessage + pairs = (('unseen', ['unseen']), ('replied', ['answered']), + ('flagged', [])) + for setting, result in pairs: + msg = mailbox.MHMessage(_sample_message) + msg.add_sequence(setting) + self.assert_(mailbox.BabylMessage(msg).get_labels() == result) + msg = mailbox.MHMessage(_sample_message) + msg.add_sequence('unseen') + msg.add_sequence('replied') + msg.add_sequence('flagged') + self.assert_(mailbox.BabylMessage(msg).get_labels() == \ + ['unseen', 'answered']) + + def test_babyl_to_maildir(self): + # Convert BabylMessage to MaildirMessage + pairs = (('unseen', ''), ('deleted', 'ST'), ('filed', 'S'), + ('answered', 'RS'), ('forwarded', 'PS'), ('edited', 'S'), + ('resent', 'PS')) + for setting, result in pairs: + msg = mailbox.BabylMessage(_sample_message) + msg.add_label(setting) + self.assert_(mailbox.MaildirMessage(msg).get_flags() == result) + self.assert_(mailbox.MaildirMessage(msg).get_subdir() == 'cur') + msg = mailbox.BabylMessage(_sample_message) + for label in ('unseen', 'deleted', 'filed', 'answered', 'forwarded', + 'edited', 'resent'): + msg.add_label(label) + self.assert_(mailbox.MaildirMessage(msg).get_flags() == 'PRT') + self.assert_(mailbox.MaildirMessage(msg).get_subdir() == 'cur') + + def test_babyl_to_mboxmmdf(self): + # Convert BabylMessage to mboxMessage and MMDFMessage + pairs = (('unseen', 'O'), ('deleted', 'ROD'), ('filed', 'RO'), + ('answered', 'ROA'), ('forwarded', 'RO'), ('edited', 'RO'), + ('resent', 'RO')) + for setting, result in pairs: + for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage): + msg = mailbox.BabylMessage(_sample_message) + msg.add_label(setting) + self.assert_(class_(msg).get_flags() == result) + msg = mailbox.BabylMessage(_sample_message) + for label in ('unseen', 'deleted', 'filed', 'answered', 'forwarded', + 'edited', 'resent'): + msg.add_label(label) + for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage): + self.assert_(class_(msg).get_flags() == 'ODA') + + def test_babyl_to_mh(self): + # Convert BabylMessage to MHMessage + pairs = (('unseen', ['unseen']), ('deleted', []), ('filed', []), + ('answered', ['replied']), ('forwarded', []), ('edited', []), + ('resent', [])) + for setting, result in pairs: + msg = mailbox.BabylMessage(_sample_message) + msg.add_label(setting) + self.assert_(mailbox.MHMessage(msg).get_sequences() == result) + msg = mailbox.BabylMessage(_sample_message) + for label in ('unseen', 'deleted', 'filed', 'answered', 'forwarded', + 'edited', 'resent'): + msg.add_label(label) + self.assert_(mailbox.MHMessage(msg).get_sequences() == \ + ['unseen', 'replied']) + + def test_babyl_to_babyl(self): + # Convert BabylMessage to BabylMessage + msg = mailbox.BabylMessage(_sample_message) + msg.update_visible() + for label in ('unseen', 'deleted', 'filed', 'answered', 'forwarded', + 'edited', 'resent'): + msg.add_label(label) + msg2 = mailbox.BabylMessage(msg) + self.assert_(msg2.get_labels() == ['unseen', 'deleted', 'filed', + 'answered', 'forwarded', 'edited', + 'resent']) + self.assert_(msg.get_visible().keys() == msg2.get_visible().keys()) + for key in msg.get_visible().keys(): + self.assert_(msg.get_visible()[key] == msg2.get_visible()[key]) + + +class TestProxyFileBase(TestBase): + + def _test_read(self, proxy): + # Read by byte + proxy.seek(0) + self.assert_(proxy.read() == 'bar') + proxy.seek(1) + self.assert_(proxy.read() == 'ar') + proxy.seek(0) + self.assert_(proxy.read(2) == 'ba') + proxy.seek(1) + self.assert_(proxy.read(-1) == 'ar') + proxy.seek(2) + self.assert_(proxy.read(1000) == 'r') + + def _test_readline(self, proxy): + # Read by line + proxy.seek(0) + self.assert_(proxy.readline() == 'foo' + os.linesep) + self.assert_(proxy.readline() == 'bar' + os.linesep) + self.assert_(proxy.readline() == 'fred' + os.linesep) + self.assert_(proxy.readline() == 'bob') + proxy.seek(2) + self.assert_(proxy.readline() == 'o' + os.linesep) + proxy.seek(6 + 2 * len(os.linesep)) + self.assert_(proxy.readline() == 'fred' + os.linesep) + proxy.seek(6 + 2 * len(os.linesep)) + self.assert_(proxy.readline(2) == 'fr') + self.assert_(proxy.readline(-10) == 'ed' + os.linesep) + + def _test_readlines(self, proxy): + # Read multiple lines + proxy.seek(0) + self.assert_(proxy.readlines() == ['foo' + os.linesep, + 'bar' + os.linesep, + 'fred' + os.linesep, 'bob']) + proxy.seek(0) + self.assert_(proxy.readlines(2) == ['foo' + os.linesep]) + proxy.seek(3 + len(os.linesep)) + self.assert_(proxy.readlines(4 + len(os.linesep)) == + ['bar' + os.linesep, 'fred' + os.linesep]) + proxy.seek(3) + self.assert_(proxy.readlines(1000) == [os.linesep, 'bar' + os.linesep, + 'fred' + os.linesep, 'bob']) + + def _test_iteration(self, proxy): + # Iterate by line + proxy.seek(0) + iterator = iter(proxy) + self.assert_(iterator.next() == 'foo' + os.linesep) + self.assert_(iterator.next() == 'bar' + os.linesep) + self.assert_(iterator.next() == 'fred' + os.linesep) + self.assert_(iterator.next() == 'bob') + self.assertRaises(StopIteration, lambda: iterator.next()) + + def _test_seek_and_tell(self, proxy): + # Seek and use tell to check position + proxy.seek(3) + self.assert_(proxy.tell() == 3) + self.assert_(proxy.read(len(os.linesep)) == os.linesep) + proxy.seek(2, 1) + self.assert_(proxy.read(1 + len(os.linesep)) == 'r' + os.linesep) + proxy.seek(-3 - len(os.linesep), 2) + self.assert_(proxy.read(3) == 'bar') + proxy.seek(2, 0) + self.assert_(proxy.read() == 'o' + os.linesep + 'bar' + os.linesep) + proxy.seek(100) + self.assert_(proxy.read() == '') + + def _test_close(self, proxy): + # Close a file + proxy.close() + self.assertRaises(AttributeError, lambda: proxy.close()) + + +class TestProxyFile(TestProxyFileBase): + + def setUp(self): + self._path = test_support.TESTFN + self._file = open(self._path, 'wb+') + + def tearDown(self): + self._file.close() + self._delete_recursively(self._path) + + def test_initialize(self): + # Initialize and check position + self._file.write('foo') + pos = self._file.tell() + proxy0 = mailbox._ProxyFile(self._file) + self.assert_(proxy0.tell() == pos) + self.assert_(self._file.tell() == pos) + proxy1 = mailbox._ProxyFile(self._file, 0) + self.assert_(proxy1.tell() == 0) + self.assert_(self._file.tell() == pos) + + def test_read(self): + self._file.write('bar') + self._test_read(mailbox._ProxyFile(self._file)) + + def test_readline(self): + self._file.write('foo%sbar%sfred%sbob' % (os.linesep, os.linesep, + os.linesep)) + self._test_readline(mailbox._ProxyFile(self._file)) + + def test_readlines(self): + self._file.write('foo%sbar%sfred%sbob' % (os.linesep, os.linesep, + os.linesep)) + self._test_readlines(mailbox._ProxyFile(self._file)) + + def test_iteration(self): + self._file.write('foo%sbar%sfred%sbob' % (os.linesep, os.linesep, + os.linesep)) + self._test_iteration(mailbox._ProxyFile(self._file)) + + def test_seek_and_tell(self): + self._file.write('foo%sbar%s' % (os.linesep, os.linesep)) + self._test_seek_and_tell(mailbox._ProxyFile(self._file)) + + def test_close(self): + self._file.write('foo%sbar%s' % (os.linesep, os.linesep)) + self._test_close(mailbox._ProxyFile(self._file)) + + +class TestPartialFile(TestProxyFileBase): + + def setUp(self): + self._path = test_support.TESTFN + self._file = open(self._path, 'wb+') + + def tearDown(self): + self._file.close() + self._delete_recursively(self._path) + + def test_initialize(self): + # Initialize and check position + self._file.write('foo' + os.linesep + 'bar') + pos = self._file.tell() + proxy = mailbox._PartialFile(self._file, 2, 5) + self.assert_(proxy.tell() == 0) + self.assert_(self._file.tell() == pos) + + def test_read(self): + self._file.write('***bar***') + self._test_read(mailbox._PartialFile(self._file, 3, 6)) + + def test_readline(self): + self._file.write('!!!!!foo%sbar%sfred%sbob!!!!!' % + (os.linesep, os.linesep, os.linesep)) + self._test_readline(mailbox._PartialFile(self._file, 5, + 18 + 3 * len(os.linesep))) + + def test_readlines(self): + self._file.write('foo%sbar%sfred%sbob?????' % + (os.linesep, os.linesep, os.linesep)) + self._test_readlines(mailbox._PartialFile(self._file, 0, + 13 + 3 * len(os.linesep))) + + def test_iteration(self): + self._file.write('____foo%sbar%sfred%sbob####' % + (os.linesep, os.linesep, os.linesep)) + self._test_iteration(mailbox._PartialFile(self._file, 4, + 17 + 3 * len(os.linesep))) + + def test_seek_and_tell(self): + self._file.write('(((foo%sbar%s$$$' % (os.linesep, os.linesep)) + self._test_seek_and_tell(mailbox._PartialFile(self._file, 3, + 9 + 2 * len(os.linesep))) + + def test_close(self): + self._file.write('&foo%sbar%s^' % (os.linesep, os.linesep)) + self._test_close(mailbox._PartialFile(self._file, 1, + 6 + 3 * len(os.linesep))) + + +## Start: tests from the original module (for backward compatibility). + FROM_ = "From some.body@dummy.domain Sat Jul 24 13:43:35 2004\n" DUMMY_MESSAGE = """\ From: some.body@dummy.domain @@ -65,15 +1626,15 @@ class MaildirTestCase(unittest.TestCase): # Test for regression on bug #117490: # Make sure the boxes attribute actually gets set. self.mbox = mailbox.Maildir(test_support.TESTFN) - self.assert_(hasattr(self.mbox, "boxes")) - self.assert_(len(self.mbox.boxes) == 0) + #self.assert_(hasattr(self.mbox, "boxes")) + #self.assert_(len(self.mbox.boxes) == 0) self.assert_(self.mbox.next() is None) self.assert_(self.mbox.next() is None) def test_nonempty_maildir_cur(self): self.createMessage("cur") self.mbox = mailbox.Maildir(test_support.TESTFN) - self.assert_(len(self.mbox.boxes) == 1) + #self.assert_(len(self.mbox.boxes) == 1) self.assert_(self.mbox.next() is not None) self.assert_(self.mbox.next() is None) self.assert_(self.mbox.next() is None) @@ -81,7 +1642,7 @@ class MaildirTestCase(unittest.TestCase): def test_nonempty_maildir_new(self): self.createMessage("new") self.mbox = mailbox.Maildir(test_support.TESTFN) - self.assert_(len(self.mbox.boxes) == 1) + #self.assert_(len(self.mbox.boxes) == 1) self.assert_(self.mbox.next() is not None) self.assert_(self.mbox.next() is None) self.assert_(self.mbox.next() is None) @@ -90,7 +1651,7 @@ class MaildirTestCase(unittest.TestCase): self.createMessage("cur") self.createMessage("new") self.mbox = mailbox.Maildir(test_support.TESTFN) - self.assert_(len(self.mbox.boxes) == 2) + #self.assert_(len(self.mbox.boxes) == 2) self.assert_(self.mbox.next() is not None) self.assert_(self.mbox.next() is not None) self.assert_(self.mbox.next() is None) @@ -108,12 +1669,99 @@ class MaildirTestCase(unittest.TestCase): self.assertEqual(len(str(msg)), len(FROM_)+len(DUMMY_MESSAGE)) self.assertEqual(n, 1) - # XXX We still need more tests! +## End: classes from the original module (for backward compatibility). + + +_sample_message = """\ +Return-Path: <gkj@gregorykjohnson.com> +X-Original-To: gkj+person@localhost +Delivered-To: gkj+person@localhost +Received: from localhost (localhost [127.0.0.1]) + by andy.gregorykjohnson.com (Postfix) with ESMTP id 356ED9DD17 + for <gkj+person@localhost>; Wed, 13 Jul 2005 17:23:16 -0400 (EDT) +Delivered-To: gkj@sundance.gregorykjohnson.com +Received: from localhost [127.0.0.1] + by localhost with POP3 (fetchmail-6.2.5) + for gkj+person@localhost (single-drop); Wed, 13 Jul 2005 17:23:16 -0400 (EDT) +Received: from andy.gregorykjohnson.com (andy.gregorykjohnson.com [64.32.235.228]) + by sundance.gregorykjohnson.com (Postfix) with ESMTP id 5B056316746 + for <gkj@gregorykjohnson.com>; Wed, 13 Jul 2005 17:23:11 -0400 (EDT) +Received: by andy.gregorykjohnson.com (Postfix, from userid 1000) + id 490CD9DD17; Wed, 13 Jul 2005 17:23:11 -0400 (EDT) +Date: Wed, 13 Jul 2005 17:23:11 -0400 +From: "Gregory K. Johnson" <gkj@gregorykjohnson.com> +To: gkj@gregorykjohnson.com +Subject: Sample message +Message-ID: <20050713212311.GC4701@andy.gregorykjohnson.com> +Mime-Version: 1.0 +Content-Type: multipart/mixed; boundary="NMuMz9nt05w80d4+" +Content-Disposition: inline +User-Agent: Mutt/1.5.9i + + +--NMuMz9nt05w80d4+ +Content-Type: text/plain; charset=us-ascii +Content-Disposition: inline + +This is a sample message. + +-- +Gregory K. Johnson + +--NMuMz9nt05w80d4+ +Content-Type: application/octet-stream +Content-Disposition: attachment; filename="text.gz" +Content-Transfer-Encoding: base64 + +H4sICM2D1UIAA3RleHQAC8nILFYAokSFktSKEoW0zJxUPa7wzJIMhZLyfIWczLzUYj0uAHTs +3FYlAAAA + +--NMuMz9nt05w80d4+-- +""" + +_sample_headers = { + "Return-Path":"<gkj@gregorykjohnson.com>", + "X-Original-To":"gkj+person@localhost", + "Delivered-To":"gkj+person@localhost", + "Received":"""from localhost (localhost [127.0.0.1]) + by andy.gregorykjohnson.com (Postfix) with ESMTP id 356ED9DD17 + for <gkj+person@localhost>; Wed, 13 Jul 2005 17:23:16 -0400 (EDT)""", + "Delivered-To":"gkj@sundance.gregorykjohnson.com", + "Received":"""from localhost [127.0.0.1] + by localhost with POP3 (fetchmail-6.2.5) + for gkj+person@localhost (single-drop); Wed, 13 Jul 2005 17:23:16 -0400 (EDT)""", + "Received":"""from andy.gregorykjohnson.com (andy.gregorykjohnson.com [64.32.235.228]) + by sundance.gregorykjohnson.com (Postfix) with ESMTP id 5B056316746 + for <gkj@gregorykjohnson.com>; Wed, 13 Jul 2005 17:23:11 -0400 (EDT)""", + "Received":"""by andy.gregorykjohnson.com (Postfix, from userid 1000) + id 490CD9DD17; Wed, 13 Jul 2005 17:23:11 -0400 (EDT)""", + "Date":"Wed, 13 Jul 2005 17:23:11 -0400", + "From":""""Gregory K. Johnson" <gkj@gregorykjohnson.com>""", + "To":"gkj@gregorykjohnson.com", + "Subject":"Sample message", + "Mime-Version":"1.0", + "Content-Type":"""multipart/mixed; boundary="NMuMz9nt05w80d4+\"""", + "Content-Disposition":"inline", + "User-Agent": "Mutt/1.5.9i" } + +_sample_payloads = ("""This is a sample message. + +-- +Gregory K. Johnson +""", +"""H4sICM2D1UIAA3RleHQAC8nILFYAokSFktSKEoW0zJxUPa7wzJIMhZLyfIWczLzUYj0uAHTs +3FYlAAAA +""") def test_main(): - test_support.run_unittest(MaildirTestCase) + tests = (TestMailboxSuperclass, TestMaildir, TestMbox, TestMMDF, TestMH, + TestBabyl, TestMessage, TestMaildirMessage, TestMboxMessage, + TestMHMessage, TestBabylMessage, TestMMDFMessage, + TestMessageConversion, TestProxyFile, TestPartialFile, + MaildirTestCase) + test_support.run_unittest(*tests) -if __name__ == "__main__": +if __name__ == '__main__': test_main() diff --git a/Lib/test/test_multibytecodec.py b/Lib/test/test_multibytecodec.py index 4d02dee..276b9af 100644 --- a/Lib/test/test_multibytecodec.py +++ b/Lib/test/test_multibytecodec.py @@ -3,7 +3,6 @@ # test_multibytecodec.py # Unit test for multibytecodec itself # -# $CJKCodecs: test_multibytecodec.py,v 1.8 2004/06/19 06:09:55 perky Exp $ from test import test_support from test import test_multibytecodec_support diff --git a/Lib/test/test_old_mailbox.py b/Lib/test/test_old_mailbox.py new file mode 100644 index 0000000..cca6897 --- /dev/null +++ b/Lib/test/test_old_mailbox.py @@ -0,0 +1,120 @@ +# This set of tests exercises the backward-compatibility class +# in mailbox.py (the ones without write support). + +import mailbox +import os +import time +import unittest +from test import test_support + +# cleanup earlier tests +try: + os.unlink(test_support.TESTFN) +except os.error: + pass + +FROM_ = "From some.body@dummy.domain Sat Jul 24 13:43:35 2004\n" +DUMMY_MESSAGE = """\ +From: some.body@dummy.domain +To: me@my.domain +Subject: Simple Test + +This is a dummy message. +""" + +class MaildirTestCase(unittest.TestCase): + + def setUp(self): + # create a new maildir mailbox to work with: + self._dir = test_support.TESTFN + os.mkdir(self._dir) + os.mkdir(os.path.join(self._dir, "cur")) + os.mkdir(os.path.join(self._dir, "tmp")) + os.mkdir(os.path.join(self._dir, "new")) + self._counter = 1 + self._msgfiles = [] + + def tearDown(self): + map(os.unlink, self._msgfiles) + os.rmdir(os.path.join(self._dir, "cur")) + os.rmdir(os.path.join(self._dir, "tmp")) + os.rmdir(os.path.join(self._dir, "new")) + os.rmdir(self._dir) + + def createMessage(self, dir, mbox=False): + t = int(time.time() % 1000000) + pid = self._counter + self._counter += 1 + filename = os.extsep.join((str(t), str(pid), "myhostname", "mydomain")) + tmpname = os.path.join(self._dir, "tmp", filename) + newname = os.path.join(self._dir, dir, filename) + fp = open(tmpname, "w") + self._msgfiles.append(tmpname) + if mbox: + fp.write(FROM_) + fp.write(DUMMY_MESSAGE) + fp.close() + if hasattr(os, "link"): + os.link(tmpname, newname) + else: + fp = open(newname, "w") + fp.write(DUMMY_MESSAGE) + fp.close() + self._msgfiles.append(newname) + return tmpname + + def test_empty_maildir(self): + """Test an empty maildir mailbox""" + # Test for regression on bug #117490: + self.mbox = mailbox.Maildir(test_support.TESTFN) + self.assert_(len(self.mbox) == 0) + self.assert_(self.mbox.next() is None) + self.assert_(self.mbox.next() is None) + + def test_nonempty_maildir_cur(self): + self.createMessage("cur") + self.mbox = mailbox.Maildir(test_support.TESTFN) + self.assert_(len(self.mbox) == 1) + self.assert_(self.mbox.next() is not None) + self.assert_(self.mbox.next() is None) + self.assert_(self.mbox.next() is None) + + def test_nonempty_maildir_new(self): + self.createMessage("new") + self.mbox = mailbox.Maildir(test_support.TESTFN) + self.assert_(len(self.mbox) == 1) + self.assert_(self.mbox.next() is not None) + self.assert_(self.mbox.next() is None) + self.assert_(self.mbox.next() is None) + + def test_nonempty_maildir_both(self): + self.createMessage("cur") + self.createMessage("new") + self.mbox = mailbox.Maildir(test_support.TESTFN) + self.assert_(len(self.mbox) == 2) + self.assert_(self.mbox.next() is not None) + self.assert_(self.mbox.next() is not None) + self.assert_(self.mbox.next() is None) + self.assert_(self.mbox.next() is None) + + def test_unix_mbox(self): + ### should be better! + import email.Parser + fname = self.createMessage("cur", True) + n = 0 + for msg in mailbox.PortableUnixMailbox(open(fname), + email.Parser.Parser().parse): + n += 1 + self.assertEqual(msg["subject"], "Simple Test") + self.assertEqual(len(str(msg)), len(FROM_)+len(DUMMY_MESSAGE)) + self.assertEqual(n, 1) + + # XXX We still need more tests! + + +def test_main(): + test_support.run_unittest(MaildirTestCase) + + +if __name__ == "__main__": + test_main() diff --git a/Lib/test/test_optparse.py b/Lib/test/test_optparse.py index f656b9f..991c06d 100644 --- a/Lib/test/test_optparse.py +++ b/Lib/test/test_optparse.py @@ -10,17 +10,22 @@ import sys import os +import re import copy +import types import unittest from cStringIO import StringIO from pprint import pprint from test import test_support + from optparse import make_option, Option, IndentedHelpFormatter, \ TitledHelpFormatter, OptionParser, OptionContainer, OptionGroup, \ SUPPRESS_HELP, SUPPRESS_USAGE, OptionError, OptionConflictError, \ - BadOptionError, OptionValueError, Values, _match_abbrev + BadOptionError, OptionValueError, Values +from optparse import _match_abbrev +from optparse import _parse_num # Do the right thing with boolean values for all known Python versions. try: @@ -28,6 +33,7 @@ try: except NameError: (True, False) = (1, 0) +retype = type(re.compile('')) class InterceptedError(Exception): def __init__(self, @@ -96,7 +102,8 @@ Args were %(args)s.""" % locals ()) args -- positional arguments to `func` kwargs -- keyword arguments to `func` expected_exception -- exception that should be raised - expected_output -- output we expect to see + expected_message -- expected exception message (or pattern + if a compiled regex object) Returns the exception raised for further testing. """ @@ -109,14 +116,23 @@ Args were %(args)s.""" % locals ()) func(*args, **kwargs) except expected_exception, err: actual_message = str(err) - self.assertEqual(actual_message, - expected_message, + if isinstance(expected_message, retype): + self.assert_(expected_message.search(actual_message), """\ +expected exception message pattern: +/%s/ +actual exception message: +'''%s''' +""" % (expected_message.pattern, actual_message)) + else: + self.assertEqual(actual_message, + expected_message, + """\ expected exception message: -'''%(expected_message)s''' +'''%s''' actual exception message: -'''%(actual_message)s''' -""" % locals()) +'''%s''' +""" % (expected_message, actual_message)) return err else: @@ -157,7 +173,9 @@ and kwargs %(kwargs)r sys.stdout = save_stdout except InterceptedError, err: - self.assertEqual(output, expected_output) + if output != expected_output: + self.fail("expected: \n'''\n" + expected_output + + "'''\nbut got \n'''\n" + output + "'''") self.assertEqual(err.exit_status, expected_status) self.assertEqual(err.exit_message, expected_error) else: @@ -366,6 +384,23 @@ class TestOptionParser(BaseTest): self.assertRaises(self.parser.remove_option, ('foo',), None, ValueError, "no such option 'foo'") + def test_refleak(self): + # If an OptionParser is carrying around a reference to a large + # object, various cycles can prevent it from being GC'd in + # a timely fashion. destroy() breaks the cycles to ensure stuff + # can be cleaned up. + big_thing = [42] + refcount = sys.getrefcount(big_thing) + parser = OptionParser() + parser.add_option("-a", "--aaarggh") + parser.big_thing = big_thing + + parser.destroy() + #self.assertEqual(refcount, sys.getrefcount(big_thing)) + del parser + self.assertEqual(refcount, sys.getrefcount(big_thing)) + + class TestOptionValues(BaseTest): def setUp(self): pass @@ -391,13 +426,21 @@ class TestTypeAliases(BaseTest): def setUp(self): self.parser = OptionParser() - def test_type_aliases(self): - self.parser.add_option("-x", type=int) + def test_str_aliases_string(self): + self.parser.add_option("-s", type="str") + self.assertEquals(self.parser.get_option("-s").type, "string") + + def test_new_type_object(self): self.parser.add_option("-s", type=str) - self.parser.add_option("-t", type="str") + self.assertEquals(self.parser.get_option("-s").type, "string") + self.parser.add_option("-x", type=int) self.assertEquals(self.parser.get_option("-x").type, "int") + + def test_old_type_object(self): + self.parser.add_option("-s", type=types.StringType) self.assertEquals(self.parser.get_option("-s").type, "string") - self.assertEquals(self.parser.get_option("-t").type, "string") + self.parser.add_option("-x", type=types.IntType) + self.assertEquals(self.parser.get_option("-x").type, "int") # Custom type for testing processing of default values. @@ -487,13 +530,13 @@ class TestProgName(BaseTest): save_argv = sys.argv[:] try: sys.argv[0] = os.path.join("foo", "bar", "baz.py") - parser = OptionParser("usage: %prog ...", version="%prog 1.2") - expected_usage = "usage: baz.py ...\n" + parser = OptionParser("%prog ...", version="%prog 1.2") + expected_usage = "Usage: baz.py ...\n" self.assertUsage(parser, expected_usage) self.assertVersion(parser, "baz.py 1.2") self.assertHelp(parser, expected_usage + "\n" + - "options:\n" + "Options:\n" " --version show program's version number and exit\n" " -h, --help show this help message and exit\n") finally: @@ -505,7 +548,7 @@ class TestProgName(BaseTest): usage="%prog arg arg") parser.remove_option("-h") parser.remove_option("--version") - expected_usage = "usage: thingy arg arg\n" + expected_usage = "Usage: thingy arg arg\n" self.assertUsage(parser, expected_usage) self.assertVersion(parser, "thingy 0.1") self.assertHelp(parser, expected_usage + "\n") @@ -515,9 +558,9 @@ class TestExpandDefaults(BaseTest): def setUp(self): self.parser = OptionParser(prog="test") self.help_prefix = """\ -usage: test [options] +Usage: test [options] -options: +Options: -h, --help show this help message and exit """ self.file_help = "read from FILE [default: %default]" @@ -699,13 +742,16 @@ class TestStandard(BaseTest): self.assertParseOK(["-a", "--", "foo", "bar"], {'a': "--", 'boo': None, 'foo': None}, ["foo", "bar"]), + self.assertParseOK(["-a", "--", "--foo", "bar"], + {'a': "--", 'boo': None, 'foo': ["bar"]}, + []), def test_short_option_joined_and_separator(self): self.assertParseOK(["-ab", "--", "--foo", "bar"], {'a': "b", 'boo': None, 'foo': None}, ["--foo", "bar"]), - def test_invalid_option_becomes_positional_arg(self): + def test_hyphen_becomes_positional_arg(self): self.assertParseOK(["-ab", "-", "--foo", "bar"], {'a': "b", 'boo': None, 'foo': ["bar"]}, ["-"]) @@ -870,6 +916,8 @@ class TestMultipleArgsAppend(BaseTest): type="float", dest="point") self.parser.add_option("-f", "--foo", action="append", nargs=2, type="int", dest="foo") + self.parser.add_option("-z", "--zero", action="append_const", + dest="foo", const=(0, 0)) def test_nargs_append(self): self.assertParseOK(["-f", "4", "-3", "blah", "--foo", "1", "666"], @@ -885,6 +933,11 @@ class TestMultipleArgsAppend(BaseTest): {'point': None, 'foo':[(3, 4)]}, []) + def test_nargs_append_const(self): + self.assertParseOK(["--zero", "--foo", "3", "4", "-z"], + {'point': None, 'foo':[(0, 0), (3, 4), (0, 0)]}, + []) + class TestVersion(BaseTest): def test_version(self): self.parser = InterceptingOptionParser(usage=SUPPRESS_USAGE, @@ -960,8 +1013,14 @@ class TestExtendAddTypes(BaseTest): self.parser.add_option("-a", None, type="string", dest="a") self.parser.add_option("-f", "--file", type="file", dest="file") + def tearDown(self): + if os.path.isdir(test_support.TESTFN): + os.rmdir(test_support.TESTFN) + elif os.path.isfile(test_support.TESTFN): + os.unlink(test_support.TESTFN) + class MyOption (Option): - def check_file (option, opt, value): + def check_file(option, opt, value): if not os.path.exists(value): raise OptionValueError("%s: file does not exist" % value) elif not os.path.isfile(value): @@ -972,25 +1031,23 @@ class TestExtendAddTypes(BaseTest): TYPE_CHECKER = copy.copy(Option.TYPE_CHECKER) TYPE_CHECKER["file"] = check_file - def test_extend_file(self): + def test_filetype_ok(self): open(test_support.TESTFN, "w").close() self.assertParseOK(["--file", test_support.TESTFN, "-afoo"], {'file': test_support.TESTFN, 'a': 'foo'}, []) - os.unlink(test_support.TESTFN) - - def test_extend_file_nonexistent(self): + def test_filetype_noexist(self): self.assertParseFail(["--file", test_support.TESTFN, "-afoo"], "%s: file does not exist" % test_support.TESTFN) - def test_file_irregular(self): + def test_filetype_notfile(self): os.mkdir(test_support.TESTFN) self.assertParseFail(["--file", test_support.TESTFN, "-afoo"], "%s: not a regular file" % test_support.TESTFN) - os.rmdir(test_support.TESTFN) + class TestExtendAddActions(BaseTest): def setUp(self): @@ -1003,7 +1060,7 @@ class TestExtendAddActions(BaseTest): STORE_ACTIONS = Option.STORE_ACTIONS + ("extend",) TYPED_ACTIONS = Option.TYPED_ACTIONS + ("extend",) - def take_action (self, action, dest, opt, value, values, parser): + def take_action(self, action, dest, opt, value, values, parser): if action == "extend": lvalue = value.split(",") values.ensure_value(dest, []).extend(lvalue) @@ -1072,7 +1129,7 @@ class TestCallback(BaseTest): callback=lambda: None, type="string", help="foo") - expected_help = ("options:\n" + expected_help = ("Options:\n" " -t TEST, --test=TEST foo\n") self.assertHelp(parser, expected_help) @@ -1085,7 +1142,7 @@ class TestCallbackExtraArgs(BaseTest): dest="points", default=[])] self.parser = OptionParser(option_list=options) - def process_tuple (self, option, opt, value, parser_, len, type): + def process_tuple(self, option, opt, value, parser_, len, type): self.assertEqual(len, 3) self.assert_(type is int) @@ -1110,7 +1167,7 @@ class TestCallbackMeddleArgs(BaseTest): self.parser = OptionParser(option_list=options) # Callback that meddles in rargs, largs - def process_n (self, option, opt, value, parser_): + def process_n(self, option, opt, value, parser_): # option is -3, -5, etc. nargs = int(opt[1:]) rargs = parser_.rargs @@ -1139,7 +1196,7 @@ class TestCallbackManyArgs(BaseTest): callback=self.process_many, type="int")] self.parser = OptionParser(option_list=options) - def process_many (self, option, opt, value, parser_): + def process_many(self, option, opt, value, parser_): if opt == "-a": self.assertEqual(value, ("foo", "bar")) elif opt == "--apple": @@ -1162,7 +1219,7 @@ class TestCallbackCheckAbbrev(BaseTest): self.parser.add_option("--foo-bar", action="callback", callback=self.check_abbrev) - def check_abbrev (self, option, opt, value, parser): + def check_abbrev(self, option, opt, value, parser): self.assertEqual(opt, "--foo-bar") def test_abbrev_callback_expansion(self): @@ -1177,7 +1234,7 @@ class TestCallbackVarArgs(BaseTest): self.parser = InterceptingOptionParser(usage=SUPPRESS_USAGE, option_list=options) - def variable_args (self, option, opt, value, parser): + def variable_args(self, option, opt, value, parser): self.assert_(value is None) done = 0 value = [] @@ -1229,7 +1286,7 @@ class ConflictBase(BaseTest): self.parser = InterceptingOptionParser(usage=SUPPRESS_USAGE, option_list=options) - def show_version (self, option, opt, value, parser): + def show_version(self, option, opt, value, parser): parser.values.show_version = 1 class TestConflict(ConflictBase): @@ -1280,7 +1337,7 @@ class TestConflictResolve(ConflictBase): def test_conflict_resolve_help(self): self.assertOutput(["-h"], """\ -options: +Options: --verbose increment verbosity -h, --help show this help message and exit -v, --version show version @@ -1319,7 +1376,7 @@ class TestConflictOverride(BaseTest): def test_conflict_override_help(self): self.assertOutput(["-h"], """\ -options: +Options: -h, --help show this help message and exit -n, --dry-run dry run mode """) @@ -1332,9 +1389,9 @@ options: # -- Other testing. ---------------------------------------------------- _expected_help_basic = """\ -usage: bar.py [options] +Usage: bar.py [options] -options: +Options: -a APPLE throw APPLEs at basket -b NUM, --boo=NUM shout "boo!" NUM times (in order to frighten away all the evil spirits that cause trouble and mayhem) @@ -1343,9 +1400,9 @@ options: """ _expected_help_long_opts_first = """\ -usage: bar.py [options] +Usage: bar.py [options] -options: +Options: -a APPLE throw APPLEs at basket --boo=NUM, -b NUM shout "boo!" NUM times (in order to frighten away all the evil spirits that cause trouble and mayhem) @@ -1358,7 +1415,7 @@ Usage ===== bar.py [options] -options +Options ======= -a APPLE throw APPLEs at basket --boo=NUM, -b NUM shout "boo!" NUM times (in order to frighten away all the @@ -1368,9 +1425,9 @@ options """ _expected_help_short_lines = """\ -usage: bar.py [options] +Usage: bar.py [options] -options: +Options: -a APPLE throw APPLEs at basket -b NUM, --boo=NUM shout "boo!" NUM times (in order to frighten away all the evil spirits @@ -1382,15 +1439,8 @@ options: class TestHelp(BaseTest): def setUp(self): - self.orig_columns = os.environ.get('COLUMNS') self.parser = self.make_parser(80) - def tearDown(self): - if self.orig_columns is None: - del os.environ['COLUMNS'] - else: - os.environ['COLUMNS'] = self.orig_columns - def make_parser(self, columns): options = [ make_option("-a", type="string", dest='a', @@ -1419,7 +1469,7 @@ class TestHelp(BaseTest): self.assertHelpEquals(_expected_help_basic) def test_help_old_usage(self): - self.parser.set_usage("usage: %prog [options]") + self.parser.set_usage("Usage: %prog [options]") self.assertHelpEquals(_expected_help_basic) def test_help_long_opts_first(self): @@ -1449,13 +1499,13 @@ class TestHelp(BaseTest): group.add_option("-g", action="store_true", help="Group option.") self.parser.add_option_group(group) - self.assertHelpEquals("""\ -usage: bar.py [options] + expect = """\ +Usage: bar.py [options] This is the program description for bar.py. bar.py has an option group as well as single options. -options: +Options: -a APPLE throw APPLEs at basket -b NUM, --boo=NUM shout "boo!" NUM times (in order to frighten away all the evil spirits that cause trouble and mayhem) @@ -1467,9 +1517,12 @@ options: that some of them bite. -g Group option. -""") +""" + self.assertHelpEquals(expect) + self.parser.epilog = "Please report bugs to /dev/null." + self.assertHelpEquals(expect + "\nPlease report bugs to /dev/null.\n") class TestMatchAbbrev(BaseTest): @@ -1490,6 +1543,43 @@ class TestMatchAbbrev(BaseTest): BadOptionError, "ambiguous option: --f (%s?)" % possibilities) +class TestParseNumber(BaseTest): + def setUp(self): + self.parser = InterceptingOptionParser() + self.parser.add_option("-n", type=int) + self.parser.add_option("-l", type=long) + + def test_parse_num_fail(self): + self.assertRaises( + _parse_num, ("", int), {}, + ValueError, + re.compile(r"invalid literal for int().*: '?'?")) + self.assertRaises( + _parse_num, ("0xOoops", long), {}, + ValueError, + re.compile(r"invalid literal for long().*: '?0xOoops'?")) + + def test_parse_num_ok(self): + self.assertEqual(_parse_num("0", int), 0) + self.assertEqual(_parse_num("0x10", int), 16) + self.assertEqual(_parse_num("0XA", long), 10L) + self.assertEqual(_parse_num("010", long), 8L) + self.assertEqual(_parse_num("0b11", int), 3) + self.assertEqual(_parse_num("0b", long), 0L) + + def test_numeric_options(self): + self.assertParseOK(["-n", "42", "-l", "0x20"], + { "n": 42, "l": 0x20 }, []) + self.assertParseOK(["-n", "0b0101", "-l010"], + { "n": 5, "l": 8 }, []) + self.assertParseFail(["-n008"], + "option -n: invalid integer value: '008'") + self.assertParseFail(["-l0b0123"], + "option -l: invalid long integer value: '0b0123'") + self.assertParseFail(["-l", "0x12x"], + "option -l: invalid long integer value: '0x12x'") + + def _testclasses(): mod = sys.modules[__name__] return [getattr(mod, name) for name in dir(mod) if name.startswith('Test')] diff --git a/Lib/test/test_os.py b/Lib/test/test_os.py index 2bc5fc0..ffc9420 100644 --- a/Lib/test/test_os.py +++ b/Lib/test/test_os.py @@ -5,6 +5,7 @@ import os import unittest import warnings +import sys from test import test_support warnings.filterwarnings("ignore", "tempnam", RuntimeWarning, __name__) @@ -364,6 +365,32 @@ class URandomTests (unittest.TestCase): except NotImplementedError: pass +class Win32ErrorTests(unittest.TestCase): + def test_rename(self): + self.assertRaises(WindowsError, os.rename, test_support.TESTFN, test_support.TESTFN+".bak") + + def test_remove(self): + self.assertRaises(WindowsError, os.remove, test_support.TESTFN) + + def test_chdir(self): + self.assertRaises(WindowsError, os.chdir, test_support.TESTFN) + + def test_mkdir(self): + self.assertRaises(WindowsError, os.chdir, test_support.TESTFN) + + def test_utime(self): + self.assertRaises(WindowsError, os.utime, test_support.TESTFN, None) + + def test_access(self): + self.assertRaises(WindowsError, os.utime, test_support.TESTFN, 0) + + def test_chmod(self): + self.assertRaises(WindowsError, os.utime, test_support.TESTFN, 0) + +if sys.platform != 'win32': + class Win32ErrorTests(unittest.TestCase): + pass + def test_main(): test_support.run_unittest( TemporaryFileTests, @@ -372,7 +399,8 @@ def test_main(): WalkTests, MakedirTests, DevNullTests, - URandomTests + URandomTests, + Win32ErrorTests ) if __name__ == "__main__": diff --git a/Lib/test/test_pty.py b/Lib/test/test_pty.py index 99e01b6..59e5162 100644 --- a/Lib/test/test_pty.py +++ b/Lib/test/test_pty.py @@ -4,13 +4,6 @@ from test.test_support import verbose, TestFailed, TestSkipped TEST_STRING_1 = "I wish to buy a fish license.\n" TEST_STRING_2 = "For my pet fish, Eric.\n" -# Solaris (at least 2.9 and 2.10) seem to have a fickle isatty(). The first -# test below, testing the result of os.openpty() for tty-ness, sometimes -# (but not always) fails. The second isatty test, in the sub-process, always -# works. Allow that fickle first test to fail on these platforms, since it -# doesn't actually affect functionality. -fickle_isatty = ["sunos5"] - if verbose: def debug(msg): print msg @@ -31,11 +24,11 @@ def normalize_output(data): # OSF/1 (Tru64) apparently turns \n into \r\r\n. if data.endswith('\r\r\n'): - return data[:-3] + '\n' + return data.replace('\r\r\n', '\n') # IRIX apparently turns \n into \r\n. if data.endswith('\r\n'): - return data[:-2] + '\n' + return data.replace('\r\n', '\n') return data @@ -54,7 +47,7 @@ def test_basic_pty(): # " An optional feature could not be imported " ... ? raise TestSkipped, "Pseudo-terminals (seemingly) not functional." - if not os.isatty(slave_fd) and sys.platform not in fickle_isatty: + if not os.isatty(slave_fd): raise TestFailed, "slave_fd is not a tty" debug("Writing to slave_fd") diff --git a/Lib/test/test_rfc822.py b/Lib/test/test_rfc822.py index 0d9f28a..6d22825 100644 --- a/Lib/test/test_rfc822.py +++ b/Lib/test/test_rfc822.py @@ -45,6 +45,10 @@ class MessageTestCase(unittest.TestCase): print 'extra parsed address:', repr(n), repr(a) continue i = i + 1 + self.assertEqual(mn, n, + "Un-expected name: %s != %s" % (`mn`, `n`)) + self.assertEqual(ma, a, + "Un-expected address: %s != %s" % (`ma`, `a`)) if mn == n and ma == a: pass else: @@ -129,6 +133,12 @@ class MessageTestCase(unittest.TestCase): 'To: person@dom.ain (User J. Person)\n\n', [('User J. Person', 'person@dom.ain')]) + def test_doublecomment(self): + # The RFC allows comments within comments in an email addr + self.check( + 'To: person@dom.ain ((User J. Person)), John Doe <foo@bar.com>\n\n', + [('User J. Person', 'person@dom.ain'), ('John Doe', 'foo@bar.com')]) + def test_twisted(self): # This one is just twisted. I don't know what the proper # result should be, but it shouldn't be to infloop, which is diff --git a/Lib/test/test_setuptools.py b/Lib/test/test_setuptools.py deleted file mode 100644 index a988303..0000000 --- a/Lib/test/test_setuptools.py +++ /dev/null @@ -1,16 +0,0 @@ -"""Tests for setuptools. - -The tests for setuptools are defined in the setuptools.tests package; -this runs them from there. -""" - -import test.test_support -from setuptools.command.test import ScanningLoader - -def test_main(): - test.test_support.run_suite( - ScanningLoader().loadTestsFromName('setuptools.tests') - ) - -if __name__ == "__main__": - test_main() diff --git a/Lib/test/test_shutil.py b/Lib/test/test_shutil.py index 7c28602..6ab5a35 100644 --- a/Lib/test/test_shutil.py +++ b/Lib/test/test_shutil.py @@ -48,12 +48,12 @@ class TestShutil(unittest.TestCase): if self.errorState == 0: self.assertEqual(func, os.remove) self.assertEqual(arg, self.childpath) - self.assertEqual(exc[0], OSError) + self.failUnless(issubclass(exc[0], OSError)) self.errorState = 1 else: self.assertEqual(func, os.rmdir) self.assertEqual(arg, TESTFN) - self.assertEqual(exc[0], OSError) + self.failUnless(issubclass(exc[0], OSError)) self.errorState = 2 def test_rmtree_dont_delete_file(self): diff --git a/Lib/test/test_socket.py b/Lib/test/test_socket.py index 6943080..2246fb6 100644 --- a/Lib/test/test_socket.py +++ b/Lib/test/test_socket.py @@ -9,6 +9,7 @@ import time import thread, threading import Queue import sys +import array from weakref import proxy PORT = 50007 @@ -852,8 +853,38 @@ class TestLinuxAbstractNamespace(unittest.TestCase): self.assertRaises(socket.error, s.bind, address) +class BufferIOTest(SocketConnectedTest): + """ + Test the buffer versions of socket.recv() and socket.send(). + """ + def __init__(self, methodName='runTest'): + SocketConnectedTest.__init__(self, methodName=methodName) + + def testRecvBuf(self): + buf = array.array('c', ' '*1024) + nbytes = self.cli_conn.recv_buf(buf) + self.assertEqual(nbytes, len(MSG)) + msg = buf.tostring()[:len(MSG)] + self.assertEqual(msg, MSG) + + def _testRecvBuf(self): + buf = buffer(MSG) + self.serv_conn.send(buf) + + def testRecvFromBuf(self): + buf = array.array('c', ' '*1024) + nbytes, addr = self.cli_conn.recvfrom_buf(buf) + self.assertEqual(nbytes, len(MSG)) + msg = buf.tostring()[:len(MSG)] + self.assertEqual(msg, MSG) + + def _testRecvFromBuf(self): + buf = buffer(MSG) + self.serv_conn.send(buf) + def test_main(): - tests = [GeneralModuleTests, BasicTCPTest, TCPTimeoutTest, TestExceptions] + tests = [GeneralModuleTests, BasicTCPTest, TCPTimeoutTest, TestExceptions, + BufferIOTest] if sys.platform != 'mac': tests.extend([ BasicUDPTest, UDPTimeoutTest ]) diff --git a/Lib/test/test_sqlite.py b/Lib/test/test_sqlite.py index 1b1d0e5..f033772 100644 --- a/Lib/test/test_sqlite.py +++ b/Lib/test/test_sqlite.py @@ -6,11 +6,12 @@ try: except ImportError: raise TestSkipped('no sqlite available') from sqlite3.test import (dbapi, types, userfunctions, - factory, transactions) + factory, transactions, hooks, regression) def test_main(): run_unittest(dbapi.suite(), types.suite(), userfunctions.suite(), - factory.suite(), transactions.suite()) + factory.suite(), transactions.suite(), hooks.suite(), + regression.suite()) if __name__ == "__main__": test_main() diff --git a/Lib/test/test_stringprep.py b/Lib/test/test_stringprep.py index 4459689..2baf4a5 100644 --- a/Lib/test/test_stringprep.py +++ b/Lib/test/test_stringprep.py @@ -2,7 +2,6 @@ # Since we don't have them, this test checks only a few codepoints. from test.test_support import verify, vereq -import sha import stringprep from stringprep import * @@ -73,6 +72,7 @@ verify(not in_table_d2(u"\u0040")) # unicode database. Instead, stringprep.py asserts the version of # the database. +# import hashlib # predicates = [k for k in dir(stringprep) if k.startswith("in_table")] # predicates.sort() # for p in predicates: @@ -83,6 +83,6 @@ verify(not in_table_d2(u"\u0040")) # if f(unichr(i)): # data[i] = "1" # data = "".join(data) -# h = sha.sha() +# h = hashlib.sha1() # h.update(data) -# print p,h.hexdigest() +# print p, h.hexdigest() diff --git a/Lib/test/test_struct.py b/Lib/test/test_struct.py index 9332466..7981a52 100644 --- a/Lib/test/test_struct.py +++ b/Lib/test/test_struct.py @@ -1,5 +1,8 @@ from test.test_support import TestFailed, verbose, verify +import test.test_support import struct +import array +import unittest import sys ISBIGENDIAN = sys.byteorder == "big" @@ -7,6 +10,8 @@ del sys verify((struct.pack('=i', 1)[0] == chr(0)) == ISBIGENDIAN, "bigendian determination appears wrong") +PY_STRUCT_RANGE_CHECKING = 1 + def string_reverse(s): chars = list(s) chars.reverse() @@ -263,7 +268,7 @@ class IntTester: else: # x is out of range -- verify pack realizes that. - if code in self.BUGGY_RANGE_CHECK: + if not PY_STRUCT_RANGE_CHECKING and code in self.BUGGY_RANGE_CHECK: if verbose: print "Skipping buggy range check for code", code else: @@ -318,7 +323,7 @@ class IntTester: else: # x is out of range -- verify pack realizes that. - if code in self.BUGGY_RANGE_CHECK: + if not PY_STRUCT_RANGE_CHECKING and code in self.BUGGY_RANGE_CHECK: if verbose: print "Skipping buggy range check for code", code else: @@ -437,3 +442,97 @@ def test_705836(): TestFailed("expected OverflowError") test_705836() + +def test_1229380(): + import sys + for endian in ('', '>', '<'): + for cls in (int, long): + for fmt in ('B', 'H', 'I', 'L'): + any_err(struct.pack, endian + fmt, cls(-1)) + + any_err(struct.pack, endian + 'B', cls(300)) + any_err(struct.pack, endian + 'H', cls(70000)) + + any_err(struct.pack, endian + 'I', sys.maxint * 4L) + any_err(struct.pack, endian + 'L', sys.maxint * 4L) + +if PY_STRUCT_RANGE_CHECKING: + test_1229380() + +class PackBufferTestCase(unittest.TestCase): + """ + Test the packing methods that work on buffers. + """ + + def test_unpack_from( self ): + test_string = 'abcd01234' + fmt = '4s' + s = struct.Struct(fmt) + for cls in (str, buffer): + data = cls(test_string) + self.assertEquals(s.unpack_from(data), ('abcd',)) + self.assertEquals(s.unpack_from(data, 2), ('cd01',)) + self.assertEquals(s.unpack_from(data, 4), ('0123',)) + for i in xrange(6): + self.assertEquals(s.unpack_from(data, i), (data[i:i+4],)) + for i in xrange(6, len(test_string) + 1): + simple_err(s.unpack_from, data, i) + for cls in (str, buffer): + data = cls(test_string) + self.assertEquals(struct.unpack_from(fmt, data), ('abcd',)) + self.assertEquals(struct.unpack_from(fmt, data, 2), ('cd01',)) + self.assertEquals(struct.unpack_from(fmt, data, 4), ('0123',)) + for i in xrange(6): + self.assertEquals(struct.unpack_from(fmt, data, i), + (data[i:i+4],)) + for i in xrange(6, len(test_string) + 1): + simple_err(struct.unpack_from, fmt, data, i) + + def test_pack_to( self ): + test_string = 'Reykjavik rocks, eow!' + writable_buf = array.array('c', ' '*100) + fmt = '21s' + s = struct.Struct(fmt) + + # Test without offset + s.pack_to(writable_buf, 0, test_string) + from_buf = writable_buf.tostring()[:len(test_string)] + self.assertEquals(from_buf, test_string) + + # Test with offset. + s.pack_to(writable_buf, 10, test_string) + from_buf = writable_buf.tostring()[:len(test_string)+10] + self.assertEquals(from_buf, (test_string[:10] + test_string)) + + # Go beyond boundaries. + small_buf = array.array('c', ' '*10) + self.assertRaises(struct.error, s.pack_to, small_buf, 0, test_string) + self.assertRaises(struct.error, s.pack_to, small_buf, 2, test_string) + + def test_pack_to_fn( self ): + test_string = 'Reykjavik rocks, eow!' + writable_buf = array.array('c', ' '*100) + fmt = '21s' + pack_to = lambda *args: struct.pack_to(fmt, *args) + + # Test without offset + pack_to(writable_buf, 0, test_string) + from_buf = writable_buf.tostring()[:len(test_string)] + self.assertEquals(from_buf, test_string) + + # Test with offset. + pack_to(writable_buf, 10, test_string) + from_buf = writable_buf.tostring()[:len(test_string)+10] + self.assertEquals(from_buf, (test_string[:10] + test_string)) + + # Go beyond boundaries. + small_buf = array.array('c', ' '*10) + self.assertRaises(struct.error, pack_to, small_buf, 0, test_string) + self.assertRaises(struct.error, pack_to, small_buf, 2, test_string) + + +def test_main(): + test.test_support.run_unittest(PackBufferTestCase) + +if __name__ == "__main__": + test_main() diff --git a/Lib/test/test_subprocess.py b/Lib/test/test_subprocess.py index c351ee9..edf5bd0 100644 --- a/Lib/test/test_subprocess.py +++ b/Lib/test/test_subprocess.py @@ -347,7 +347,7 @@ class ProcessTestCase(unittest.TestCase): stdout=subprocess.PIPE, universal_newlines=1) stdout = p.stdout.read() - if hasattr(open, 'newlines'): + if hasattr(file, 'newlines'): # Interpreter with universal newline support self.assertEqual(stdout, "line1\nline2\nline3\nline4\nline5\nline6") @@ -374,7 +374,7 @@ class ProcessTestCase(unittest.TestCase): stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=1) (stdout, stderr) = p.communicate() - if hasattr(open, 'newlines'): + if hasattr(file, 'newlines'): # Interpreter with universal newline support self.assertEqual(stdout, "line1\nline2\nline3\nline4\nline5\nline6") diff --git a/Lib/test/test_sundry.py b/Lib/test/test_sundry.py index af13684..f19467c 100644 --- a/Lib/test/test_sundry.py +++ b/Lib/test/test_sundry.py @@ -50,11 +50,7 @@ import pstats import py_compile import pydoc import rexec -try: - import rlcompleter # not available on Windows -except ImportError: - if verbose: - print "skipping rlcompleter" +import rlcompleter import sched import smtplib import sndhdr diff --git a/Lib/test/test_support.py b/Lib/test/test_support.py index cc71366..2d08f4d 100644 --- a/Lib/test/test_support.py +++ b/Lib/test/test_support.py @@ -30,7 +30,9 @@ class ResourceDenied(TestSkipped): """ verbose = 1 # Flag set to 0 by regrtest.py -use_resources = None # Flag set to [] by regrtest.py +use_resources = None # Flag set to [] by regrtest.py +max_memuse = 0 # Disable bigmem tests (they will still be run with + # small sizes, to make sure they work.) # _original_stdout is meant to hold stdout at the time regrtest began. # This may be "the real" stdout, or IDLE's emulation of stdout, or whatever. @@ -250,6 +252,108 @@ def open_urlresource(url): return open(fn) #======================================================================= +# Decorator for running a function in a different locale, correctly resetting +# it afterwards. + +def run_with_locale(catstr, *locales): + def decorator(func): + def inner(*args, **kwds): + try: + import locale + category = getattr(locale, catstr) + orig_locale = locale.setlocale(category) + except AttributeError: + # if the test author gives us an invalid category string + raise + except: + # cannot retrieve original locale, so do nothing + locale = orig_locale = None + else: + for loc in locales: + try: + locale.setlocale(category, loc) + break + except: + pass + + # now run the function, resetting the locale on exceptions + try: + return func(*args, **kwds) + finally: + if locale and orig_locale: + locale.setlocale(category, orig_locale) + inner.func_name = func.func_name + inner.__doc__ = func.__doc__ + return inner + return decorator + +#======================================================================= +# Big-memory-test support. Separate from 'resources' because memory use should be configurable. + +# Some handy shorthands. Note that these are used for byte-limits as well +# as size-limits, in the various bigmem tests +_1M = 1024*1024 +_1G = 1024 * _1M +_2G = 2 * _1G + +def set_memlimit(limit): + import re + global max_memuse + sizes = { + 'k': 1024, + 'm': _1M, + 'g': _1G, + 't': 1024*_1G, + } + m = re.match(r'(\d+(\.\d+)?) (K|M|G|T)b?$', limit, + re.IGNORECASE | re.VERBOSE) + if m is None: + raise ValueError('Invalid memory limit %r' % (limit,)) + memlimit = int(float(m.group(1)) * sizes[m.group(3).lower()]) + if memlimit < 2.5*_1G: + raise ValueError('Memory limit %r too low to be useful' % (limit,)) + max_memuse = memlimit + +def bigmemtest(minsize, memuse, overhead=5*_1M): + """Decorator for bigmem tests. + + 'minsize' is the minimum useful size for the test (in arbitrary, + test-interpreted units.) 'memuse' is the number of 'bytes per size' for + the test, or a good estimate of it. 'overhead' specifies fixed overhead, + independant of the testsize, and defaults to 5Mb. + + The decorator tries to guess a good value for 'size' and passes it to + the decorated test function. If minsize * memuse is more than the + allowed memory use (as defined by max_memuse), the test is skipped. + Otherwise, minsize is adjusted upward to use up to max_memuse. + """ + def decorator(f): + def wrapper(self): + if not max_memuse: + # If max_memuse is 0 (the default), + # we still want to run the tests with size set to a few kb, + # to make sure they work. We still want to avoid using + # too much memory, though, but we do that noisily. + maxsize = 5147 + self.failIf(maxsize * memuse + overhead > 20 * _1M) + else: + maxsize = int((max_memuse - overhead) / memuse) + if maxsize < minsize: + # Really ought to print 'test skipped' or something + if verbose: + sys.stderr.write("Skipping %s because of memory " + "constraint\n" % (f.__name__,)) + return + # Try to keep some breathing room in memory use + maxsize = max(maxsize - 50 * _1M, minsize) + return f(self, maxsize) + wrapper.minsize = minsize + wrapper.memuse = memuse + wrapper.overhead = overhead + return wrapper + return decorator + +#======================================================================= # Preliminary PyUNIT integration. import unittest diff --git a/Lib/test/test_syntax.py b/Lib/test/test_syntax.py index b61debf..dc7a16d 100644 --- a/Lib/test/test_syntax.py +++ b/Lib/test/test_syntax.py @@ -86,13 +86,16 @@ SyntaxError: can't assign to literal (<doctest test.test_syntax[11]>, line 1) Traceback (most recent call last): SyntaxError: can't assign to operator (<doctest test.test_syntax[12]>, line 1) +>>> a if 1 else b = 1 +Traceback (most recent call last): +SyntaxError: can't assign to conditional expression (<doctest test.test_syntax[13]>, line 1) From compiler_complex_args(): >>> def f(None=1): ... pass Traceback (most recent call last): -SyntaxError: assignment to None (<doctest test.test_syntax[13]>, line 1) +SyntaxError: assignment to None (<doctest test.test_syntax[14]>, line 1) From ast_for_arguments(): @@ -100,22 +103,22 @@ From ast_for_arguments(): >>> def f(x, y=1, z): ... pass Traceback (most recent call last): -SyntaxError: non-default argument follows default argument (<doctest test.test_syntax[14]>, line 1) +SyntaxError: non-default argument follows default argument (<doctest test.test_syntax[15]>, line 1) >>> def f(x, None): ... pass Traceback (most recent call last): -SyntaxError: assignment to None (<doctest test.test_syntax[15]>, line 1) +SyntaxError: assignment to None (<doctest test.test_syntax[16]>, line 1) >>> def f(*None): ... pass Traceback (most recent call last): -SyntaxError: assignment to None (<doctest test.test_syntax[16]>, line 1) +SyntaxError: assignment to None (<doctest test.test_syntax[17]>, line 1) >>> def f(**None): ... pass Traceback (most recent call last): -SyntaxError: assignment to None (<doctest test.test_syntax[17]>, line 1) +SyntaxError: assignment to None (<doctest test.test_syntax[18]>, line 1) From ast_for_funcdef(): @@ -123,7 +126,7 @@ From ast_for_funcdef(): >>> def None(x): ... pass Traceback (most recent call last): -SyntaxError: assignment to None (<doctest test.test_syntax[18]>, line 1) +SyntaxError: assignment to None (<doctest test.test_syntax[19]>, line 1) From ast_for_call(): @@ -135,7 +138,7 @@ From ast_for_call(): [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] >>> f(x for x in L, 1) Traceback (most recent call last): -SyntaxError: Generator expression must be parenthesized if not sole argument (<doctest test.test_syntax[22]>, line 1) +SyntaxError: Generator expression must be parenthesized if not sole argument (<doctest test.test_syntax[23]>, line 1) >>> f((x for x in L), 1) [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] @@ -167,7 +170,7 @@ SyntaxError: Generator expression must be parenthesized if not sole argument (<d ... i244, i245, i246, i247, i248, i249, i250, i251, i252, ... i253, i254, i255) Traceback (most recent call last): -SyntaxError: more than 255 arguments (<doctest test.test_syntax[24]>, line 1) +SyntaxError: more than 255 arguments (<doctest test.test_syntax[25]>, line 1) The actual error cases counts positional arguments, keyword arguments, and generator expression arguments separately. This test combines the @@ -201,37 +204,37 @@ three. ... (x for x in i244), i245, i246, i247, i248, i249, i250, i251, ... i252=1, i253=1, i254=1, i255=1) Traceback (most recent call last): -SyntaxError: more than 255 arguments (<doctest test.test_syntax[25]>, line 1) +SyntaxError: more than 255 arguments (<doctest test.test_syntax[26]>, line 1) >>> f(lambda x: x[0] = 3) Traceback (most recent call last): -SyntaxError: lambda cannot contain assignment (<doctest test.test_syntax[26]>, line 1) +SyntaxError: lambda cannot contain assignment (<doctest test.test_syntax[27]>, line 1) The grammar accepts any test (basically, any expression) in the keyword slot of a call site. Test a few different options. >>> f(x()=2) Traceback (most recent call last): -SyntaxError: keyword can't be an expression (<doctest test.test_syntax[27]>, line 1) +SyntaxError: keyword can't be an expression (<doctest test.test_syntax[28]>, line 1) >>> f(a or b=1) Traceback (most recent call last): -SyntaxError: keyword can't be an expression (<doctest test.test_syntax[28]>, line 1) +SyntaxError: keyword can't be an expression (<doctest test.test_syntax[29]>, line 1) >>> f(x.y=1) Traceback (most recent call last): -SyntaxError: keyword can't be an expression (<doctest test.test_syntax[29]>, line 1) +SyntaxError: keyword can't be an expression (<doctest test.test_syntax[30]>, line 1) From ast_for_expr_stmt(): >>> (x for x in x) += 1 Traceback (most recent call last): -SyntaxError: augmented assignment to generator expression not possible (<doctest test.test_syntax[30]>, line 1) +SyntaxError: augmented assignment to generator expression not possible (<doctest test.test_syntax[31]>, line 1) >>> None += 1 Traceback (most recent call last): -SyntaxError: assignment to None (<doctest test.test_syntax[31]>, line 1) +SyntaxError: assignment to None (<doctest test.test_syntax[32]>, line 1) >>> f() += 1 Traceback (most recent call last): -SyntaxError: illegal expression for augmented assignment (<doctest test.test_syntax[32]>, line 1) +SyntaxError: illegal expression for augmented assignment (<doctest test.test_syntax[33]>, line 1) """ import re @@ -243,15 +246,18 @@ from test import test_support class SyntaxTestCase(unittest.TestCase): def _check_error(self, code, errtext, - filename="<testcase>", mode="exec"): + filename="<testcase>", mode="exec", subclass=None): """Check that compiling code raises SyntaxError with errtext. errtest is a regular expression that must be present in the - test of the exception raised. + test of the exception raised. If subclass is specified it + is the expected subclass of SyntaxError (e.g. IndentationError). """ try: compile(code, filename, mode) except SyntaxError, err: + if subclass and not isinstance(err, subclass): + self.fail("SyntaxError is not a %s" % subclass.__name__) mo = re.search(errtext, str(err)) if mo is None: self.fail("SyntaxError did not contain '%r'" % (errtext,)) @@ -290,6 +296,22 @@ class SyntaxTestCase(unittest.TestCase): :""") self._check_error(source, "nested scope") + def test_unexpected_indent(self): + self._check_error("foo()\n bar()\n", "unexpected indent", + subclass=IndentationError) + + def test_no_indent(self): + self._check_error("if 1:\nfoo()", "expected an indented block", + subclass=IndentationError) + + def test_bad_outdent(self): + self._check_error("if 1:\n foo()\n bar()", + "unindent does not match .* level", + subclass=IndentationError) + + def test_kwargs_last(self): + self._check_error("int(base=10, '2')", "non-keyword arg") + def test_main(): test_support.run_unittest(SyntaxTestCase) from test import test_syntax diff --git a/Lib/test/test_tarfile.py b/Lib/test/test_tarfile.py index 98349c4..8ee0f41 100644 --- a/Lib/test/test_tarfile.py +++ b/Lib/test/test_tarfile.py @@ -2,6 +2,7 @@ import sys import os import shutil import tempfile +import StringIO import unittest import tarfile @@ -25,7 +26,7 @@ def path(path): testtar = path("testtar.tar") tempdir = os.path.join(tempfile.gettempdir(), "testtar" + os.extsep + "dir") tempname = test_support.TESTFN -membercount = 10 +membercount = 12 def tarname(comp=""): if not comp: @@ -86,7 +87,9 @@ class ReadTest(BaseTest): if self.sep != "|": filename = "0-REGTYPE-TEXT" self.tar.extract(filename, dirname()) - lines1 = file(os.path.join(dirname(), filename), "rU").readlines() + f = open(os.path.join(dirname(), filename), "rU") + lines1 = f.readlines() + f.close() lines2 = self.tar.extractfile(filename).readlines() self.assert_(lines1 == lines2, "_FileObject.readline() does not work correctly") @@ -96,7 +99,9 @@ class ReadTest(BaseTest): if self.sep != "|": filename = "0-REGTYPE-TEXT" self.tar.extract(filename, dirname()) - lines1 = file(os.path.join(dirname(), filename), "rU").readlines() + f = open(os.path.join(dirname(), filename), "rU") + lines1 = f.readlines() + f.close() lines2 = [line for line in self.tar.extractfile(filename)] self.assert_(lines1 == lines2, "ExFileObject iteration does not work correctly") @@ -107,7 +112,9 @@ class ReadTest(BaseTest): if self.sep != "|": filename = "0-REGTYPE" self.tar.extract(filename, dirname()) - data = file(os.path.join(dirname(), filename), "rb").read() + f = open(os.path.join(dirname(), filename), "rb") + data = f.read() + f.close() tarinfo = self.tar.getmember(filename) fobj = self.tar.extractfile(tarinfo) @@ -155,7 +162,7 @@ class ReadTest(BaseTest): tarinfo = tarfile.TarInfo("directory/") tarinfo.type = tarfile.REGTYPE - fobj = file(filename, "w") + fobj = open(filename, "w") fobj.write(tarinfo.tobuf()) fobj.close() @@ -209,8 +216,21 @@ class ReadStreamTest(ReadTest): self.assert_(v2 is not None, "stream.extractfile() failed") self.assert_(v1.read() == v2.read(), "stream extraction failed") + tar.close() stream.close() +class ReadDetectTest(ReadTest): + + def setUp(self): + self.tar = tarfile.open(tarname(self.comp), self.mode) + +class ReadDetectFileobjTest(ReadTest): + + def setUp(self): + name = tarname(self.comp) + self.tar = tarfile.open(name, mode=self.mode, + fileobj=open(name, "rb")) + class ReadAsteriskTest(ReadTest): def setUp(self): @@ -254,7 +274,7 @@ class WriteTest(BaseTest): if not tarinfo.isreg(): continue f = self.src.extractfile(tarinfo) - if self.dst.posix and len(tarinfo.name) > tarfile.LENGTH_NAME: + if self.dst.posix and len(tarinfo.name) > tarfile.LENGTH_NAME and "/" not in tarinfo.name: self.assertRaises(ValueError, self.dst.addfile, tarinfo, f) else: @@ -273,15 +293,22 @@ class WriteSize0Test(BaseTest): def test_file(self): path = os.path.join(self.tmpdir, "file") - file(path, "w") + f = open(path, "w") + f.close() tarinfo = self.dst.gettarinfo(path) self.assertEqual(tarinfo.size, 0) - file(path, "w").write("aaa") + f = open(path, "w") + f.write("aaa") + f.close() tarinfo = self.dst.gettarinfo(path) self.assertEqual(tarinfo.size, 3) def test_directory(self): path = os.path.join(self.tmpdir, "directory") + if os.path.exists(path): + # This shouldn't be necessary, but is <wink> if a previous + # run was killed in mid-stream. + shutil.rmtree(path) os.mkdir(path) tarinfo = self.dst.gettarinfo(path) self.assertEqual(tarinfo.size, 0) @@ -385,6 +412,52 @@ class WriteGNULongTest(unittest.TestCase): self._test(("longnam/" * 127) + "longname_", ("longlnk/" * 127) + "longlink_") +class ReadGNULongTest(unittest.TestCase): + + def setUp(self): + self.tar = tarfile.open(tarname()) + + def tearDown(self): + self.tar.close() + + def test_1471427(self): + """Test reading of longname (bug #1471427). + """ + name = "test/" * 20 + "0-REGTYPE" + try: + tarinfo = self.tar.getmember(name) + except KeyError: + tarinfo = None + self.assert_(tarinfo is not None, "longname not found") + self.assert_(tarinfo.type != tarfile.DIRTYPE, "read longname as dirtype") + + def test_read_name(self): + name = ("0-LONGNAME-" * 10)[:101] + try: + tarinfo = self.tar.getmember(name) + except KeyError: + tarinfo = None + self.assert_(tarinfo is not None, "longname not found") + + def test_read_link(self): + link = ("1-LONGLINK-" * 10)[:101] + name = ("0-LONGNAME-" * 10)[:101] + try: + tarinfo = self.tar.getmember(link) + except KeyError: + tarinfo = None + self.assert_(tarinfo is not None, "longlink not found") + self.assert_(tarinfo.linkname == name, "linkname wrong") + + def test_truncated_longname(self): + f = open(tarname()) + fobj = StringIO.StringIO(f.read(1024)) + f.close() + tar = tarfile.open(name="foo.tar", fileobj=fobj) + self.assert_(len(tar.getmembers()) == 0, "") + tar.close() + + class ExtractHardlinkTest(BaseTest): def test_hardlink(self): @@ -420,7 +493,9 @@ class CreateHardlinkTest(BaseTest): if os.path.exists(self.bar): os.remove(self.bar) - file(self.foo, "w").write("foo") + f = open(self.foo, "w") + f.write("foo") + f.close() self.tar.add(self.foo) def test_add_twice(self): @@ -459,6 +534,10 @@ class WriteTestGzip(WriteTest): comp = "gz" class WriteStreamTestGzip(WriteStreamTest): comp = "gz" +class ReadDetectTestGzip(ReadDetectTest): + comp = "gz" +class ReadDetectFileobjTestGzip(ReadDetectFileobjTest): + comp = "gz" class ReadAsteriskTestGzip(ReadAsteriskTest): comp = "gz" class ReadStreamAsteriskTestGzip(ReadStreamAsteriskTest): @@ -482,6 +561,10 @@ if bz2: comp = "bz2" class WriteStreamTestBzip2(WriteStreamTestGzip): comp = "bz2" + class ReadDetectTestBzip2(ReadDetectTest): + comp = "bz2" + class ReadDetectFileobjTestBzip2(ReadDetectFileobjTest): + comp = "bz2" class ReadAsteriskTestBzip2(ReadAsteriskTest): comp = "bz2" class ReadStreamAsteriskTestBzip2(ReadStreamAsteriskTest): @@ -495,23 +578,34 @@ if not gzip: del WriteStreamTestGzip def test_main(): + # Create archive. + f = open(tarname(), "rb") + fguts = f.read() + f.close() if gzip: # create testtar.tar.gz - gzip.open(tarname("gz"), "wb").write(file(tarname(), "rb").read()) + tar = gzip.open(tarname("gz"), "wb") + tar.write(fguts) + tar.close() if bz2: # create testtar.tar.bz2 - bz2.BZ2File(tarname("bz2"), "wb").write(file(tarname(), "rb").read()) + tar = bz2.BZ2File(tarname("bz2"), "wb") + tar.write(fguts) + tar.close() tests = [ FileModeTest, ReadTest, ReadStreamTest, + ReadDetectTest, + ReadDetectFileobjTest, ReadAsteriskTest, ReadStreamAsteriskTest, WriteTest, WriteSize0Test, WriteStreamTest, WriteGNULongTest, + ReadGNULongTest, ] if hasattr(os, "link"): @@ -522,6 +616,7 @@ def test_main(): tests.extend([ ReadTestGzip, ReadStreamTestGzip, WriteTestGzip, WriteStreamTestGzip, + ReadDetectTestGzip, ReadDetectFileobjTestGzip, ReadAsteriskTestGzip, ReadStreamAsteriskTestGzip ]) @@ -529,6 +624,7 @@ def test_main(): tests.extend([ ReadTestBzip2, ReadStreamTestBzip2, WriteTestBzip2, WriteStreamTestBzip2, + ReadDetectTestBzip2, ReadDetectFileobjTestBzip2, ReadAsteriskTestBzip2, ReadStreamAsteriskTestBzip2 ]) try: diff --git a/Lib/test/test_tcl.py b/Lib/test/test_tcl.py index 2eeabc1..e3fbf98 100644 --- a/Lib/test/test_tcl.py +++ b/Lib/test/test_tcl.py @@ -124,6 +124,7 @@ class TclTest(unittest.TestCase): self.assertRaises(TclError,tcl.winfo_geometry) tcl.loadtk() self.assertEqual('1x1+0+0', tcl.winfo_geometry()) + tcl.destroy() def testLoadTkFailure(self): import os diff --git a/Lib/test/test_threaded_import.py b/Lib/test/test_threaded_import.py index dc27d4e..0642d25 100644 --- a/Lib/test/test_threaded_import.py +++ b/Lib/test/test_threaded_import.py @@ -6,7 +6,7 @@ # randrange, and then Python hangs. import thread -from test.test_support import verbose, TestSkipped +from test.test_support import verbose, TestSkipped, TestFailed critical_section = thread.allocate_lock() done = thread.allocate_lock() @@ -25,6 +25,23 @@ def task(): if finished: done.release() +def test_import_hangers(): + import sys + if verbose: + print "testing import hangers ...", + + from test import threaded_import_hangers + + try: + if threaded_import_hangers.errors: + raise TestFailed(threaded_import_hangers.errors) + elif verbose: + print "OK." + finally: + # In case this test is run again, make sure the helper module + # gets loaded from scratch again. + del sys.modules['test.threaded_import_hangers'] + # Tricky: When regrtest imports this module, the thread running regrtest # grabs the import lock and won't let go of it until this module returns. # All other threads attempting an import hang for the duration. Since @@ -53,5 +70,7 @@ def test_main(): # magic name! see above print "OK." done.release() + test_import_hangers() + if __name__ == "__main__": test_main() diff --git a/Lib/test/test_traceback.py b/Lib/test/test_traceback.py index 22c0456..1b59f98 100644 --- a/Lib/test/test_traceback.py +++ b/Lib/test/test_traceback.py @@ -103,6 +103,12 @@ def test(): import sys sys.exc_traceback.__members__ + def test_base_exception(self): + # Test that exceptions derived from BaseException are formatted right + e = KeyboardInterrupt() + lst = traceback.format_exception_only(e.__class__, e) + self.assertEqual(lst, ['KeyboardInterrupt\n']) + def test_main(): run_unittest(TracebackCases) diff --git a/Lib/test/test_unicode.py b/Lib/test/test_unicode.py index c7113b5..34f9371 100644 --- a/Lib/test/test_unicode.py +++ b/Lib/test/test_unicode.py @@ -411,19 +411,10 @@ class UnicodeTest( return u'\u1234' self.assertEqual('%s' % Wrapper(), u'\u1234') + @test_support.run_with_locale('LC_ALL', 'de_DE', 'fr_FR') def test_format_float(self): - try: - import locale - orig_locale = locale.setlocale(locale.LC_ALL) - locale.setlocale(locale.LC_ALL, 'de_DE') - except (ImportError, locale.Error): - return # skip if we can't set locale - - try: - # should not format with a comma, but always with C locale - self.assertEqual(u'1.0', u'%.1f' % 1.0) - finally: - locale.setlocale(locale.LC_ALL, orig_locale) + # should not format with a comma, but always with C locale + self.assertEqual(u'1.0', u'%.1f' % 1.0) def test_constructor(self): # unicode(obj) tests (this maps to PyObject_Unicode() at C level) diff --git a/Lib/test/test_unicodedata.py b/Lib/test/test_unicodedata.py index f84caad..0023bf4 100644 --- a/Lib/test/test_unicodedata.py +++ b/Lib/test/test_unicodedata.py @@ -6,7 +6,7 @@ """#" import unittest, test.test_support -import sha +import hashlib encoding = 'utf-8' @@ -16,10 +16,10 @@ encoding = 'utf-8' class UnicodeMethodsTest(unittest.TestCase): # update this, if the database changes - expectedchecksum = 'a6555cd209d960dcfa17bfdce0c96d91cfa9a9ba' + expectedchecksum = 'c198ed264497f108434b3f576d4107237221cc8a' def test_method_checksum(self): - h = sha.sha() + h = hashlib.sha1() for i in range(65536): char = unichr(i) data = [ @@ -75,11 +75,11 @@ class UnicodeDatabaseTest(unittest.TestCase): class UnicodeFunctionsTest(UnicodeDatabaseTest): # update this, if the database changes - expectedchecksum = 'b45b79f3203ee1a896d9b5655484adaff5d4964b' + expectedchecksum = '4e389f97e9f88b8b7ab743121fd643089116f9f2' def test_function_checksum(self): data = [] - h = sha.sha() + h = hashlib.sha1() for i in range(0x10000): char = unichr(i) diff --git a/Lib/test/test_urllib2.py b/Lib/test/test_urllib2.py index 64a2ee9..c8f19bc 100644 --- a/Lib/test/test_urllib2.py +++ b/Lib/test/test_urllib2.py @@ -10,10 +10,7 @@ from urllib2 import Request, OpenerDirector # XXX # Request # CacheFTPHandler (hard to write) -# parse_keqv_list, parse_http_list (I'm leaving this for Anthony Baxter -# and Greg Stein, since they're doing Digest Authentication) -# Authentication stuff (ditto) -# CustomProxy, CustomProxyHandler +# parse_keqv_list, parse_http_list, HTTPDigestAuthHandler class TrivialTests(unittest.TestCase): def test_trivial(self): @@ -49,6 +46,70 @@ class TrivialTests(unittest.TestCase): self.assertEquals(urllib2.parse_http_list(string), list) +def test_password_manager(self): + """ + >>> mgr = urllib2.HTTPPasswordMgr() + >>> add = mgr.add_password + >>> add("Some Realm", "http://example.com/", "joe", "password") + >>> add("Some Realm", "http://example.com/ni", "ni", "ni") + >>> add("c", "http://example.com/foo", "foo", "ni") + >>> add("c", "http://example.com/bar", "bar", "nini") + >>> add("b", "http://example.com/", "first", "blah") + >>> add("b", "http://example.com/", "second", "spam") + >>> add("a", "http://example.com", "1", "a") + >>> add("Some Realm", "http://c.example.com:3128", "3", "c") + >>> add("Some Realm", "d.example.com", "4", "d") + >>> add("Some Realm", "e.example.com:3128", "5", "e") + + >>> mgr.find_user_password("Some Realm", "example.com") + ('joe', 'password') + >>> mgr.find_user_password("Some Realm", "http://example.com") + ('joe', 'password') + >>> mgr.find_user_password("Some Realm", "http://example.com/") + ('joe', 'password') + >>> mgr.find_user_password("Some Realm", "http://example.com/spam") + ('joe', 'password') + >>> mgr.find_user_password("Some Realm", "http://example.com/spam/spam") + ('joe', 'password') + >>> mgr.find_user_password("c", "http://example.com/foo") + ('foo', 'ni') + >>> mgr.find_user_password("c", "http://example.com/bar") + ('bar', 'nini') + + Currently, we use the highest-level path where more than one match: + + >>> mgr.find_user_password("Some Realm", "http://example.com/ni") + ('joe', 'password') + + Use latest add_password() in case of conflict: + + >>> mgr.find_user_password("b", "http://example.com/") + ('second', 'spam') + + No special relationship between a.example.com and example.com: + + >>> mgr.find_user_password("a", "http://example.com/") + ('1', 'a') + >>> mgr.find_user_password("a", "http://a.example.com/") + (None, None) + + Ports: + + >>> mgr.find_user_password("Some Realm", "c.example.com") + (None, None) + >>> mgr.find_user_password("Some Realm", "c.example.com:3128") + ('3', 'c') + >>> mgr.find_user_password("Some Realm", "http://c.example.com:3128") + ('3', 'c') + >>> mgr.find_user_password("Some Realm", "d.example.com") + ('4', 'd') + >>> mgr.find_user_password("Some Realm", "e.example.com:3128") + ('5', 'e') + + """ + pass + + class MockOpener: addheaders = [] def open(self, req, data=None): @@ -89,6 +150,8 @@ class FakeMethod: return self.handle(self.meth_name, self.action, *args) class MockHandler: + # useful for testing handler machinery + # see add_ordered_mock_handlers() docstring handler_order = 500 def __init__(self, methods): self._define_methods(methods) @@ -161,6 +224,50 @@ def add_ordered_mock_handlers(opener, meth_spec): opener.add_handler(h) return handlers +def build_test_opener(*handler_instances): + opener = OpenerDirector() + for h in handler_instances: + opener.add_handler(h) + return opener + +class MockHTTPHandler(urllib2.BaseHandler): + # useful for testing redirections and auth + # sends supplied headers and code as first response + # sends 200 OK as second response + def __init__(self, code, headers): + self.code = code + self.headers = headers + self.reset() + def reset(self): + self._count = 0 + self.requests = [] + def http_open(self, req): + import mimetools, httplib, copy + from StringIO import StringIO + self.requests.append(copy.deepcopy(req)) + if self._count == 0: + self._count = self._count + 1 + name = httplib.responses[self.code] + msg = mimetools.Message(StringIO(self.headers)) + return self.parent.error( + "http", req, MockFile(), self.code, name, msg) + else: + self.req = req + msg = mimetools.Message(StringIO("\r\n\r\n")) + return MockResponse(200, "OK", msg, "", req.get_full_url()) + +class MockPasswordManager: + def add_password(self, realm, uri, user, password): + self.realm = realm + self.url = uri + self.user = user + self.password = password + def find_user_password(self, realm, authuri): + self.target_realm = realm + self.target_url = authuri + return self.user, self.password + + class OpenerDirectorTests(unittest.TestCase): def test_handled(self): @@ -612,33 +719,18 @@ class HandlerTests(unittest.TestCase): urllib2.HTTPRedirectHandler.max_redirections) def test_cookie_redirect(self): - class MockHTTPHandler(urllib2.HTTPHandler): - def __init__(self): self._count = 0 - def http_open(self, req): - import mimetools - from StringIO import StringIO - if self._count == 0: - self._count = self._count + 1 - msg = mimetools.Message( - StringIO("Location: http://www.cracker.com/\r\n\r\n")) - return self.parent.error( - "http", req, MockFile(), 302, "Found", msg) - else: - self.req = req - msg = mimetools.Message(StringIO("\r\n\r\n")) - return MockResponse(200, "OK", msg, "", req.get_full_url()) # cookies shouldn't leak into redirected requests from cookielib import CookieJar - from urllib2 import build_opener, HTTPHandler, HTTPError, \ - HTTPCookieProcessor from test.test_cookielib import interact_netscape cj = CookieJar() interact_netscape(cj, "http://www.example.com/", "spam=eggs") - hh = MockHTTPHandler() - cp = HTTPCookieProcessor(cj) - o = build_opener(hh, cp) + hh = MockHTTPHandler(302, "Location: http://www.cracker.com/\r\n\r\n") + hdeh = urllib2.HTTPDefaultErrorHandler() + hrh = urllib2.HTTPRedirectHandler() + cp = urllib2.HTTPCookieProcessor(cj) + o = build_test_opener(hh, hdeh, hrh, cp) o.open("http://www.example.com/") self.assert_(not hh.req.has_header("Cookie")) @@ -659,6 +751,92 @@ class HandlerTests(unittest.TestCase): self.assertEqual([(handlers[0], "http_open")], [tup[0:2] for tup in o.calls]) + def test_basic_auth(self): + opener = OpenerDirector() + password_manager = MockPasswordManager() + auth_handler = urllib2.HTTPBasicAuthHandler(password_manager) + realm = "ACME Widget Store" + http_handler = MockHTTPHandler( + 401, 'WWW-Authenticate: Basic realm="%s"\r\n\r\n' % realm) + self._test_basic_auth(opener, auth_handler, "Authorization", + realm, http_handler, password_manager, + "http://acme.example.com/protected", + "http://acme.example.com/protected", + ) + + def test_proxy_basic_auth(self): + opener = OpenerDirector() + ph = urllib2.ProxyHandler(dict(http="proxy.example.com:3128")) + opener.add_handler(ph) + password_manager = MockPasswordManager() + auth_handler = urllib2.ProxyBasicAuthHandler(password_manager) + realm = "ACME Networks" + http_handler = MockHTTPHandler( + 407, 'Proxy-Authenticate: Basic realm="%s"\r\n\r\n' % realm) + self._test_basic_auth(opener, auth_handler, "Proxy-authorization", + realm, http_handler, password_manager, + "http://acme.example.com:3128/protected", + "proxy.example.com:3128", + ) + + def test_basic_and_digest_auth_handlers(self): + # HTTPDigestAuthHandler threw an exception if it couldn't handle a 40* + # response (http://python.org/sf/1479302), where it should instead + # return None to allow another handler (especially + # HTTPBasicAuthHandler) to handle the response. + class TestDigestAuthHandler(urllib2.HTTPDigestAuthHandler): + handler_order = 400 # strictly before HTTPBasicAuthHandler + opener = OpenerDirector() + password_manager = MockPasswordManager() + digest_handler = TestDigestAuthHandler(password_manager) + basic_handler = urllib2.HTTPBasicAuthHandler(password_manager) + opener.add_handler(digest_handler) + realm = "ACME Networks" + http_handler = MockHTTPHandler( + 401, 'WWW-Authenticate: Basic realm="%s"\r\n\r\n' % realm) + self._test_basic_auth(opener, basic_handler, "Authorization", + realm, http_handler, password_manager, + "http://acme.example.com/protected", + "http://acme.example.com/protected", + ) + + def _test_basic_auth(self, opener, auth_handler, auth_header, + realm, http_handler, password_manager, + request_url, protected_url): + import base64, httplib + user, password = "wile", "coyote" + opener.add_handler(auth_handler) + opener.add_handler(http_handler) + + # .add_password() fed through to password manager + auth_handler.add_password(realm, request_url, user, password) + self.assertEqual(realm, password_manager.realm) + self.assertEqual(request_url, password_manager.url) + self.assertEqual(user, password_manager.user) + self.assertEqual(password, password_manager.password) + + r = opener.open(request_url) + + # should have asked the password manager for the username/password + self.assertEqual(password_manager.target_realm, realm) + self.assertEqual(password_manager.target_url, protected_url) + + # expect one request without authorization, then one with + self.assertEqual(len(http_handler.requests), 2) + self.assertFalse(http_handler.requests[0].has_header(auth_header)) + userpass = '%s:%s' % (user, password) + auth_hdr_value = 'Basic '+base64.encodestring(userpass).strip() + self.assertEqual(http_handler.requests[1].get_header(auth_header), + auth_hdr_value) + + # if the password manager can't find a password, the handler won't + # handle the HTTP auth error + password_manager.user = password_manager.password = None + http_handler.reset() + r = opener.open(request_url) + self.assertEqual(len(http_handler.requests), 1) + self.assertFalse(http_handler.requests[0].has_header(auth_header)) + class MiscTests(unittest.TestCase): @@ -700,157 +878,15 @@ class MiscTests(unittest.TestCase): else: self.assert_(False) -class NetworkTests(unittest.TestCase): - def setUp(self): - if 0: # for debugging - import logging - logger = logging.getLogger("test_urllib2") - logger.addHandler(logging.StreamHandler()) - - def test_range (self): - req = urllib2.Request("http://www.python.org", - headers={'Range': 'bytes=20-39'}) - result = urllib2.urlopen(req) - data = result.read() - self.assertEqual(len(data), 20) - - # XXX The rest of these tests aren't very good -- they don't check much. - # They do sometimes catch some major disasters, though. - - def test_ftp(self): - urls = [ - 'ftp://www.python.org/pub/python/misc/sousa.au', - 'ftp://www.python.org/pub/tmp/blat', - 'ftp://gatekeeper.research.compaq.com/pub/DEC/SRC' - '/research-reports/00README-Legal-Rules-Regs', - ] - self._test_urls(urls, self._extra_handlers()) - - def test_gopher(self): - import warnings - warnings.filterwarnings("ignore", - "the gopherlib module is deprecated", - DeprecationWarning, - "urllib2$") - urls = [ - # Thanks to Fred for finding these! - 'gopher://gopher.lib.ncsu.edu/11/library/stacks/Alex', - 'gopher://gopher.vt.edu:10010/10/33', - ] - self._test_urls(urls, self._extra_handlers()) - - def test_file(self): - TESTFN = test_support.TESTFN - f = open(TESTFN, 'w') - try: - f.write('hi there\n') - f.close() - urls = [ - 'file:'+sanepathname2url(os.path.abspath(TESTFN)), - - # XXX bug, should raise URLError - #('file://nonsensename/etc/passwd', None, urllib2.URLError) - ('file://nonsensename/etc/passwd', None, (OSError, socket.error)) - ] - self._test_urls(urls, self._extra_handlers()) - finally: - os.remove(TESTFN) - - def test_http(self): - urls = [ - 'http://www.espn.com/', # redirect - 'http://www.python.org/Spanish/Inquistion/', - ('http://www.python.org/cgi-bin/faqw.py', - 'query=pythonistas&querytype=simple&casefold=yes&req=search', None), - 'http://www.python.org/', - ] - self._test_urls(urls, self._extra_handlers()) - - # XXX Following test depends on machine configurations that are internal - # to CNRI. Need to set up a public server with the right authentication - # configuration for test purposes. - -## def test_cnri(self): -## if socket.gethostname() == 'bitdiddle': -## localhost = 'bitdiddle.cnri.reston.va.us' -## elif socket.gethostname() == 'bitdiddle.concentric.net': -## localhost = 'localhost' -## else: -## localhost = None -## if localhost is not None: -## urls = [ -## 'file://%s/etc/passwd' % localhost, -## 'http://%s/simple/' % localhost, -## 'http://%s/digest/' % localhost, -## 'http://%s/not/found.h' % localhost, -## ] - -## bauth = HTTPBasicAuthHandler() -## bauth.add_password('basic_test_realm', localhost, 'jhylton', -## 'password') -## dauth = HTTPDigestAuthHandler() -## dauth.add_password('digest_test_realm', localhost, 'jhylton', -## 'password') - -## self._test_urls(urls, self._extra_handlers()+[bauth, dauth]) - - def _test_urls(self, urls, handlers): - import socket - import time - import logging - debug = logging.getLogger("test_urllib2").debug - - urllib2.install_opener(urllib2.build_opener(*handlers)) - - for url in urls: - if isinstance(url, tuple): - url, req, expected_err = url - else: - req = expected_err = None - debug(url) - try: - f = urllib2.urlopen(url, req) - except (IOError, socket.error, OSError), err: - debug(err) - if expected_err: - self.assert_(isinstance(err, expected_err)) - else: - buf = f.read() - f.close() - debug("read %d bytes" % len(buf)) - debug("******** next url coming up...") - time.sleep(0.1) - - def _extra_handlers(self): - handlers = [] - - handlers.append(urllib2.GopherHandler) - - cfh = urllib2.CacheFTPHandler() - cfh.setTimeout(1) - handlers.append(cfh) - -## # XXX try out some custom proxy objects too! -## def at_cnri(req): -## host = req.get_host() -## debug(host) -## if host[-18:] == '.cnri.reston.va.us': -## return True -## p = CustomProxy('http', at_cnri, 'proxy.cnri.reston.va.us') -## ph = CustomProxyHandler(p) -## handlers.append(ph) - - return handlers - def test_main(verbose=None): + from test import test_urllib2 + test_support.run_doctest(test_urllib2, verbose) test_support.run_doctest(urllib2, verbose) tests = (TrivialTests, OpenerDirectorTests, HandlerTests, MiscTests) - if test_support.is_resource_enabled('network'): - tests += (NetworkTests,) test_support.run_unittest(*tests) if __name__ == "__main__": diff --git a/Lib/test/test_urllib2net.py b/Lib/test/test_urllib2net.py index 3c23246..dc3d36d 100644 --- a/Lib/test/test_urllib2net.py +++ b/Lib/test/test_urllib2net.py @@ -2,6 +2,7 @@ import unittest from test import test_support +from test.test_urllib2 import sanepathname2url import socket import urllib2 @@ -23,6 +24,46 @@ class URLTimeoutTest(unittest.TestCase): f = urllib2.urlopen("http://www.python.org/") x = f.read() + +class AuthTests(unittest.TestCase): + """Tests urllib2 authentication features.""" + +## Disabled at the moment since there is no page under python.org which +## could be used to HTTP authentication. +# +# def test_basic_auth(self): +# import httplib +# +# test_url = "http://www.python.org/test/test_urllib2/basic_auth" +# test_hostport = "www.python.org" +# test_realm = 'Test Realm' +# test_user = 'test.test_urllib2net' +# test_password = 'blah' +# +# # failure +# try: +# urllib2.urlopen(test_url) +# except urllib2.HTTPError, exc: +# self.assertEqual(exc.code, 401) +# else: +# self.fail("urlopen() should have failed with 401") +# +# # success +# auth_handler = urllib2.HTTPBasicAuthHandler() +# auth_handler.add_password(test_realm, test_hostport, +# test_user, test_password) +# opener = urllib2.build_opener(auth_handler) +# f = opener.open('http://localhost/') +# response = urllib2.urlopen("http://www.python.org/") +# +# # The 'userinfo' URL component is deprecated by RFC 3986 for security +# # reasons, let's not implement it! (it's already implemented for proxy +# # specification strings (that is, URLs or authorities specifying a +# # proxy), so we must keep that) +# self.assertRaises(httplib.InvalidURL, +# urllib2.urlopen, "http://evil:thing@example.com") + + class urlopenNetworkTests(unittest.TestCase): """Tests urllib2.urlopen using the network. @@ -84,9 +125,145 @@ class urlopenNetworkTests(unittest.TestCase): # urllib2.urlopen, "http://www.sadflkjsasadf.com/") urllib2.urlopen, "http://www.python.invalid/") + +class OtherNetworkTests(unittest.TestCase): + def setUp(self): + if 0: # for debugging + import logging + logger = logging.getLogger("test_urllib2net") + logger.addHandler(logging.StreamHandler()) + + def test_range (self): + req = urllib2.Request("http://www.python.org", + headers={'Range': 'bytes=20-39'}) + result = urllib2.urlopen(req) + data = result.read() + self.assertEqual(len(data), 20) + + # XXX The rest of these tests aren't very good -- they don't check much. + # They do sometimes catch some major disasters, though. + + def test_ftp(self): + urls = [ + 'ftp://www.python.org/pub/python/misc/sousa.au', + 'ftp://www.python.org/pub/tmp/blat', + 'ftp://gatekeeper.research.compaq.com/pub/DEC/SRC' + '/research-reports/00README-Legal-Rules-Regs', + ] + self._test_urls(urls, self._extra_handlers()) + + def test_gopher(self): + import warnings + warnings.filterwarnings("ignore", + "the gopherlib module is deprecated", + DeprecationWarning, + "urllib2$") + urls = [ + # Thanks to Fred for finding these! + 'gopher://gopher.lib.ncsu.edu/11/library/stacks/Alex', + 'gopher://gopher.vt.edu:10010/10/33', + ] + self._test_urls(urls, self._extra_handlers()) + + def test_file(self): + TESTFN = test_support.TESTFN + f = open(TESTFN, 'w') + try: + f.write('hi there\n') + f.close() + urls = [ + 'file:'+sanepathname2url(os.path.abspath(TESTFN)), + + # XXX bug, should raise URLError + #('file://nonsensename/etc/passwd', None, urllib2.URLError) + ('file://nonsensename/etc/passwd', None, (OSError, socket.error)) + ] + self._test_urls(urls, self._extra_handlers()) + finally: + os.remove(TESTFN) + + def test_http(self): + urls = [ + 'http://www.espn.com/', # redirect + 'http://www.python.org/Spanish/Inquistion/', + ('http://www.python.org/cgi-bin/faqw.py', + 'query=pythonistas&querytype=simple&casefold=yes&req=search', None), + 'http://www.python.org/', + ] + self._test_urls(urls, self._extra_handlers()) + + # XXX Following test depends on machine configurations that are internal + # to CNRI. Need to set up a public server with the right authentication + # configuration for test purposes. + +## def test_cnri(self): +## if socket.gethostname() == 'bitdiddle': +## localhost = 'bitdiddle.cnri.reston.va.us' +## elif socket.gethostname() == 'bitdiddle.concentric.net': +## localhost = 'localhost' +## else: +## localhost = None +## if localhost is not None: +## urls = [ +## 'file://%s/etc/passwd' % localhost, +## 'http://%s/simple/' % localhost, +## 'http://%s/digest/' % localhost, +## 'http://%s/not/found.h' % localhost, +## ] + +## bauth = HTTPBasicAuthHandler() +## bauth.add_password('basic_test_realm', localhost, 'jhylton', +## 'password') +## dauth = HTTPDigestAuthHandler() +## dauth.add_password('digest_test_realm', localhost, 'jhylton', +## 'password') + +## self._test_urls(urls, self._extra_handlers()+[bauth, dauth]) + + def _test_urls(self, urls, handlers): + import socket + import time + import logging + debug = logging.getLogger("test_urllib2").debug + + urllib2.install_opener(urllib2.build_opener(*handlers)) + + for url in urls: + if isinstance(url, tuple): + url, req, expected_err = url + else: + req = expected_err = None + debug(url) + try: + f = urllib2.urlopen(url, req) + except (IOError, socket.error, OSError), err: + debug(err) + if expected_err: + self.assert_(isinstance(err, expected_err)) + else: + buf = f.read() + f.close() + debug("read %d bytes" % len(buf)) + debug("******** next url coming up...") + time.sleep(0.1) + + def _extra_handlers(self): + handlers = [] + + handlers.append(urllib2.GopherHandler) + + cfh = urllib2.CacheFTPHandler() + cfh.setTimeout(1) + handlers.append(cfh) + + return handlers + + + def test_main(): test_support.requires("network") - test_support.run_unittest(URLTimeoutTest, urlopenNetworkTests) + test_support.run_unittest(URLTimeoutTest, urlopenNetworkTests, + AuthTests, OtherNetworkTests) if __name__ == "__main__": test_main() diff --git a/Lib/test/test_weakref.py b/Lib/test/test_weakref.py index 9634ef2..392e5fa 100644 --- a/Lib/test/test_weakref.py +++ b/Lib/test/test_weakref.py @@ -769,10 +769,54 @@ class MappingTestCase(TestBase): dict, objects = self.make_weak_keyed_dict() self.check_iters(dict) + # Test keyrefs() + refs = dict.keyrefs() + self.assertEqual(len(refs), len(objects)) + objects2 = list(objects) + for wr in refs: + ob = wr() + self.assert_(dict.has_key(ob)) + self.assert_(ob in dict) + self.assertEqual(ob.arg, dict[ob]) + objects2.remove(ob) + self.assertEqual(len(objects2), 0) + + # Test iterkeyrefs() + objects2 = list(objects) + self.assertEqual(len(list(dict.iterkeyrefs())), len(objects)) + for wr in dict.iterkeyrefs(): + ob = wr() + self.assert_(dict.has_key(ob)) + self.assert_(ob in dict) + self.assertEqual(ob.arg, dict[ob]) + objects2.remove(ob) + self.assertEqual(len(objects2), 0) + def test_weak_valued_iters(self): dict, objects = self.make_weak_valued_dict() self.check_iters(dict) + # Test valuerefs() + refs = dict.valuerefs() + self.assertEqual(len(refs), len(objects)) + objects2 = list(objects) + for wr in refs: + ob = wr() + self.assertEqual(ob, dict[ob.arg]) + self.assertEqual(ob.arg, dict[ob.arg].arg) + objects2.remove(ob) + self.assertEqual(len(objects2), 0) + + # Test itervaluerefs() + objects2 = list(objects) + self.assertEqual(len(list(dict.itervaluerefs())), len(objects)) + for wr in dict.itervaluerefs(): + ob = wr() + self.assertEqual(ob, dict[ob.arg]) + self.assertEqual(ob.arg, dict[ob.arg].arg) + objects2.remove(ob) + self.assertEqual(len(objects2), 0) + def check_iters(self, dict): # item iterator: items = dict.items() diff --git a/Lib/test/test_with.py b/Lib/test/test_with.py index 48e00f4..5750508 100644 --- a/Lib/test/test_with.py +++ b/Lib/test/test_with.py @@ -17,15 +17,10 @@ from test.test_support import run_unittest class MockContextManager(GeneratorContextManager): def __init__(self, gen): GeneratorContextManager.__init__(self, gen) - self.context_called = False self.enter_called = False self.exit_called = False self.exit_args = None - def __context__(self): - self.context_called = True - return GeneratorContextManager.__context__(self) - def __enter__(self): self.enter_called = True return GeneratorContextManager.__enter__(self) @@ -33,7 +28,8 @@ class MockContextManager(GeneratorContextManager): def __exit__(self, type, value, traceback): self.exit_called = True self.exit_args = (type, value, traceback) - return GeneratorContextManager.__exit__(self, type, value, traceback) + return GeneratorContextManager.__exit__(self, type, + value, traceback) def mock_contextmanager(func): @@ -60,21 +56,17 @@ def mock_contextmanager_generator(): class Nested(object): - def __init__(self, *contexts): - self.contexts = contexts + def __init__(self, *managers): + self.managers = managers self.entered = None - def __context__(self): - return self - def __enter__(self): if self.entered is not None: raise RuntimeError("Context is not reentrant") self.entered = deque() vars = [] try: - for context in self.contexts: - mgr = context.__context__() + for mgr in self.managers: vars.append(mgr.__enter__()) self.entered.appendleft(mgr) except: @@ -99,17 +91,12 @@ class Nested(object): class MockNested(Nested): - def __init__(self, *contexts): - Nested.__init__(self, *contexts) - self.context_called = False + def __init__(self, *managers): + Nested.__init__(self, *managers) self.enter_called = False self.exit_called = False self.exit_args = None - def __context__(self): - self.context_called = True - return Nested.__context__(self) - def __enter__(self): self.enter_called = True return Nested.__enter__(self) @@ -126,24 +113,8 @@ class FailureTestCase(unittest.TestCase): with foo: pass self.assertRaises(NameError, fooNotDeclared) - def testContextAttributeError(self): - class LacksContext(object): - def __enter__(self): - pass - - def __exit__(self, type, value, traceback): - pass - - def fooLacksContext(): - foo = LacksContext() - with foo: pass - self.assertRaises(AttributeError, fooLacksContext) - def testEnterAttributeError(self): class LacksEnter(object): - def __context__(self): - pass - def __exit__(self, type, value, traceback): pass @@ -154,9 +125,6 @@ class FailureTestCase(unittest.TestCase): def testExitAttributeError(self): class LacksExit(object): - def __context__(self): - pass - def __enter__(self): pass @@ -192,27 +160,10 @@ class FailureTestCase(unittest.TestCase): 'with mock as (foo, None, bar):\n' ' pass') - def testContextThrows(self): - class ContextThrows(object): - def __context__(self): - raise RuntimeError("Context threw") - - def shouldThrow(): - ct = ContextThrows() - self.foo = None - with ct as self.foo: - pass - self.assertRaises(RuntimeError, shouldThrow) - self.assertEqual(self.foo, None) - def testEnterThrows(self): class EnterThrows(object): - def __context__(self): - return self - def __enter__(self): - raise RuntimeError("Context threw") - + raise RuntimeError("Enter threw") def __exit__(self, *args): pass @@ -226,8 +177,6 @@ class FailureTestCase(unittest.TestCase): def testExitThrows(self): class ExitThrows(object): - def __context__(self): - return self def __enter__(self): return def __exit__(self, *args): @@ -241,13 +190,11 @@ class ContextmanagerAssertionMixin(object): TEST_EXCEPTION = RuntimeError("test exception") def assertInWithManagerInvariants(self, mock_manager): - self.assertTrue(mock_manager.context_called) self.assertTrue(mock_manager.enter_called) self.assertFalse(mock_manager.exit_called) self.assertEqual(mock_manager.exit_args, None) def assertAfterWithManagerInvariants(self, mock_manager, exit_args): - self.assertTrue(mock_manager.context_called) self.assertTrue(mock_manager.enter_called) self.assertTrue(mock_manager.exit_called) self.assertEqual(mock_manager.exit_args, exit_args) @@ -268,7 +215,6 @@ class ContextmanagerAssertionMixin(object): raise self.TEST_EXCEPTION def assertAfterWithManagerInvariantsWithError(self, mock_manager): - self.assertTrue(mock_manager.context_called) self.assertTrue(mock_manager.enter_called) self.assertTrue(mock_manager.exit_called) self.assertEqual(mock_manager.exit_args[0], RuntimeError) @@ -472,7 +418,6 @@ class ExceptionalTestCase(unittest.TestCase, ContextmanagerAssertionMixin): # The inner statement stuff should never have been touched self.assertEqual(self.bar, None) - self.assertFalse(mock_b.context_called) self.assertFalse(mock_b.enter_called) self.assertFalse(mock_b.exit_called) self.assertEqual(mock_b.exit_args, None) @@ -506,13 +451,9 @@ class ExceptionalTestCase(unittest.TestCase, ContextmanagerAssertionMixin): self.assertRaises(StopIteration, shouldThrow) def testRaisedStopIteration2(self): - class cm (object): - def __context__(self): - return self - + class cm(object): def __enter__(self): pass - def __exit__(self, type, value, traceback): pass @@ -535,12 +476,8 @@ class ExceptionalTestCase(unittest.TestCase, ContextmanagerAssertionMixin): def testRaisedGeneratorExit2(self): class cm (object): - def __context__(self): - return self - def __enter__(self): pass - def __exit__(self, type, value, traceback): pass @@ -629,7 +566,6 @@ class AssignmentTargetTestCase(unittest.TestCase): def testMultipleComplexTargets(self): class C: - def __context__(self): return self def __enter__(self): return 1, 2, 3 def __exit__(self, t, v, tb): pass targets = {1: [0, 1, 2]} @@ -651,7 +587,6 @@ class ExitSwallowsExceptionTestCase(unittest.TestCase): def testExitTrueSwallowsException(self): class AfricanSwallow: - def __context__(self): return self def __enter__(self): pass def __exit__(self, t, v, tb): return True try: @@ -662,7 +597,6 @@ class ExitSwallowsExceptionTestCase(unittest.TestCase): def testExitFalseDoesntSwallowException(self): class EuropeanSwallow: - def __context__(self): return self def __enter__(self): pass def __exit__(self, t, v, tb): return False try: diff --git a/Lib/test/test_zlib.py b/Lib/test/test_zlib.py index 7634680..ccbc8fd 100644 --- a/Lib/test/test_zlib.py +++ b/Lib/test/test_zlib.py @@ -302,6 +302,63 @@ class CompressObjectTestCase(unittest.TestCase): dco = zlib.decompressobj() self.assertEqual(dco.flush(), "") # Returns nothing + def test_compresscopy(self): + # Test copying a compression object + data0 = HAMLET_SCENE + data1 = HAMLET_SCENE.swapcase() + c0 = zlib.compressobj(zlib.Z_BEST_COMPRESSION) + bufs0 = [] + bufs0.append(c0.compress(data0)) + + c1 = c0.copy() + bufs1 = bufs0[:] + + bufs0.append(c0.compress(data0)) + bufs0.append(c0.flush()) + s0 = ''.join(bufs0) + + bufs1.append(c1.compress(data1)) + bufs1.append(c1.flush()) + s1 = ''.join(bufs1) + + self.assertEqual(zlib.decompress(s0),data0+data0) + self.assertEqual(zlib.decompress(s1),data0+data1) + + def test_badcompresscopy(self): + # Test copying a compression object in an inconsistent state + c = zlib.compressobj() + c.compress(HAMLET_SCENE) + c.flush() + self.assertRaises(ValueError, c.copy) + + def test_decompresscopy(self): + # Test copying a decompression object + data = HAMLET_SCENE + comp = zlib.compress(data) + + d0 = zlib.decompressobj() + bufs0 = [] + bufs0.append(d0.decompress(comp[:32])) + + d1 = d0.copy() + bufs1 = bufs0[:] + + bufs0.append(d0.decompress(comp[32:])) + s0 = ''.join(bufs0) + + bufs1.append(d1.decompress(comp[32:])) + s1 = ''.join(bufs1) + + self.assertEqual(s0,s1) + self.assertEqual(s0,data) + + def test_baddecompresscopy(self): + # Test copying a compression object in an inconsistent state + data = zlib.compress(HAMLET_SCENE) + d = zlib.decompressobj() + d.decompress(data) + d.flush() + self.assertRaises(ValueError, d.copy) def genblock(seed, length, step=1024, generator=random): """length-byte stream of random data from a seed (in step-byte blocks).""" diff --git a/Lib/test/testtar.tar b/Lib/test/testtar.tar Binary files differindex 8fd0c50..1f4493f 100644 --- a/Lib/test/testtar.tar +++ b/Lib/test/testtar.tar diff --git a/Lib/test/threaded_import_hangers.py b/Lib/test/threaded_import_hangers.py new file mode 100644 index 0000000..b21c52f --- /dev/null +++ b/Lib/test/threaded_import_hangers.py @@ -0,0 +1,42 @@ +# This is a helper module for test_threaded_import. The test imports this +# module, and this module tries to run various Python library functions in +# their own thread, as a side effect of being imported. If the spawned +# thread doesn't complete in TIMEOUT seconds, an "appeared to hang" message +# is appended to the module-global `errors` list. That list remains empty +# if (and only if) all functions tested complete. + +TIMEOUT = 10 + +import threading + +import tempfile +import os.path + +errors = [] + +# This class merely runs a function in its own thread T. The thread importing +# this module holds the import lock, so if the function called by T tries +# to do its own imports it will block waiting for this module's import +# to complete. +class Worker(threading.Thread): + def __init__(self, function, args): + threading.Thread.__init__(self) + self.function = function + self.args = args + + def run(self): + self.function(*self.args) + +for name, func, args in [ + # Bug 147376: TemporaryFile hung on Windows, starting in Python 2.4. + ("tempfile.TemporaryFile", tempfile.TemporaryFile, ()), + + # The real cause for bug 147376: ntpath.abspath() caused the hang. + ("os.path.abspath", os.path.abspath, ('.',)), + ]: + + t = Worker(func, args) + t.start() + t.join(TIMEOUT) + if t.isAlive(): + errors.append("%s appeared to hang" % name) |