diff options
Diffstat (limited to 'Lib/test')
-rwxr-xr-x | Lib/test/regrtest.py | 160 | ||||
-rw-r--r-- | Lib/test/test_asyncore.py | 406 | ||||
-rw-r--r-- | Lib/test/test_format.py | 15 | ||||
-rw-r--r-- | Lib/test/test_grammar.py | 14 | ||||
-rw-r--r-- | Lib/test/test_mailbox.py | 15 | ||||
-rw-r--r-- | Lib/test/test_parser.py | 6 | ||||
-rw-r--r-- | Lib/test/test_urllib2.py | 6 | ||||
-rw-r--r-- | Lib/test/test_xmlrpc.py | 135 | ||||
-rw-r--r-- | Lib/test/test_zipfile.py | 114 |
9 files changed, 730 insertions, 141 deletions
diff --git a/Lib/test/regrtest.py b/Lib/test/regrtest.py index 8c4b5e5..de7807c 100755 --- a/Lib/test/regrtest.py +++ b/Lib/test/regrtest.py @@ -335,12 +335,12 @@ def main(tests=None, testdir=None, verbose=0, quiet=False, generate=False, tests = map(removepy, tests) stdtests = STDTESTS[:] - nottests = NOTTESTS[:] + nottests = NOTTESTS.copy() if exclude: for arg in args: if arg in stdtests: stdtests.remove(arg) - nottests[:0] = args + nottests.add(arg) args = [] tests = tests or args or findtests(testdir, stdtests, nottests) if single: @@ -478,14 +478,14 @@ STDTESTS = [ 'test_unittest', 'test_doctest', 'test_doctest2', - ] +] -NOTTESTS = [ +NOTTESTS = { 'test_support', 'test_future1', 'test_future2', 'test_future3', - ] +} def findtests(testdir=None, stdtests=STDTESTS, nottests=NOTTESTS): """Return a list of all applicable test modules.""" @@ -818,12 +818,14 @@ def printlist(x, width=70, indent=4): # test_timeout # Controlled by test_timeout.skip_expected. Requires the network # resource and a socket module. +# +# Tests that are expected to be skipped everywhere except on one platform +# are also handled separately. _expectations = { 'win32': """ test__locale - test_applesingle test_bsddb3 test_commands test_crypt @@ -836,9 +838,7 @@ _expectations = { test_grp test_ioctl test_largefile - test_linuxaudiodev test_mhlib - test_nis test_openpty test_ossaudiodev test_poll @@ -847,24 +847,16 @@ _expectations = { test_pwd test_resource test_signal - test_sunaudiodev test_threadsignals test_wait3 test_wait4 """, 'linux2': """ - test_applesingle test_curses test_dl test_largefile - test_linuxaudiodev - test_nis - test_ntpath test_ossaudiodev - test_sqlite - test_startfile - test_sunaudiodev """, 'mac': """ @@ -882,11 +874,8 @@ _expectations = { test_grp test_ioctl test_largefile - test_linuxaudiodev test_locale test_mmap - test_nis - test_ntpath test_openpty test_ossaudiodev test_poll @@ -896,69 +885,45 @@ _expectations = { test_pwd test_resource test_signal - test_sqlite - test_startfile - test_sunaudiodev test_sundry test_tarfile """, 'unixware7': """ - test_applesingle test_bsddb test_dl test_largefile - test_linuxaudiodev test_minidom - test_nis - test_ntpath test_openpty test_pyexpat test_sax - test_startfile - test_sqlite - test_sunaudiodev test_sundry """, 'openunix8': """ - test_applesingle test_bsddb test_dl test_largefile - test_linuxaudiodev test_minidom - test_nis - test_ntpath test_openpty test_pyexpat test_sax - test_sqlite - test_startfile - test_sunaudiodev test_sundry """, 'sco_sv3': """ - test_applesingle test_asynchat test_bsddb test_dl test_fork1 test_gettext test_largefile - test_linuxaudiodev test_locale test_minidom - test_nis - test_ntpath test_openpty test_pyexpat test_queue test_sax - test_sqlite - test_startfile - test_sunaudiodev test_sundry test_thread test_threaded_import @@ -967,7 +932,6 @@ _expectations = { """, 'riscos': """ - test_applesingle test_asynchat test_atexit test_bsddb @@ -981,18 +945,12 @@ _expectations = { test_gdbm test_grp test_largefile - test_linuxaudiodev test_locale test_mmap - test_nis - test_ntpath test_openpty test_poll test_pty test_pwd - test_sqlite - test_startfile - test_sunaudiodev test_sundry test_thread test_threaded_import @@ -1001,209 +959,135 @@ _expectations = { """, 'darwin': """ + test__locale + test_bsddb + test_bsddb3 + test_curses test_gdbm test_largefile - test_linuxaudiodev test_locale - test_nis test_ossaudiodev - test_startfile - test_sunaudiodev + test_poll """, 'sunos5': """ - test_applesingle test_bsddb test_curses test_dbm test_gdbm test_gzip - test_linuxaudiodev test_openpty - test_sqlite - test_startfile test_zipfile test_zlib """, 'hp-ux11': """ - test_applesingle test_bsddb test_curses test_dl test_gdbm test_gzip test_largefile - test_linuxaudiodev test_locale test_minidom - test_nis - test_ntpath test_openpty test_pyexpat test_sax - test_sqlite - test_startfile - test_sunaudiodev test_zipfile test_zlib """, 'atheos': """ - test_applesingle test_curses test_dl test_gdbm test_largefile - test_linuxaudiodev test_locale test_mhlib test_mmap - test_nis test_poll test_resource - test_sqlite - test_startfile - test_sunaudiodev """, 'cygwin': """ - test_applesingle test_bsddb3 test_curses test_dbm test_ioctl test_largefile - test_linuxaudiodev test_locale - test_nis test_ossaudiodev test_socketserver - test_sqlite - test_sunaudiodev """, 'os2emx': """ - test_applesingle test_audioop test_bsddb3 test_commands test_curses test_dl test_largefile - test_linuxaudiodev test_mhlib test_mmap - test_nis test_openpty test_ossaudiodev test_pty test_resource test_signal - test_sqlite - test_startfile - test_sunaudiodev """, 'freebsd4': """ - test_aepack - test_applesingle test_bsddb test_bsddb3 test_gdbm - test_linuxaudiodev test_locale - test_macostools - test_nis test_ossaudiodev test_pep277 - test_plistlib test_pty - test_scriptpackages test_socket_ssl test_socketserver - test_sqlite - test_startfile - test_sunaudiodev test_tcl test_timeout - test_unicode_file test_urllibnet - test_winreg - test_winsound """, 'aix5': """ - test_aepack - test_applesingle test_bsddb test_bsddb3 test_bz2 test_dl test_gdbm test_gzip - test_linuxaudiodev - test_macostools - test_nis test_ossaudiodev - test_sqlite - test_startfile - test_sunaudiodev test_tcl - test_winreg - test_winsound test_zipimport test_zlib """, 'openbsd3': """ - test_aepack - test_applesingle test_bsddb test_bsddb3 test_ctypes test_dl test_gdbm - test_linuxaudiodev test_locale - test_macostools - test_nis test_normalization test_ossaudiodev test_pep277 - test_plistlib - test_scriptpackages test_tcl - test_sqlite - test_startfile - test_sunaudiodev - test_unicode_file - test_winreg - test_winsound """, 'netbsd3': """ - test_aepack - test_applesingle test_bsddb test_bsddb3 test_ctypes test_curses test_dl test_gdbm - test_linuxaudiodev test_locale - test_macostools - test_nis test_ossaudiodev test_pep277 - test_sqlite - test_startfile - test_sunaudiodev test_tcl - test_unicode_file - test_winreg - test_winsound """, } _expectations['freebsd5'] = _expectations['freebsd4'] @@ -1221,6 +1105,9 @@ class _ExpectedSkips: s = _expectations[sys.platform] self.expected = set(s.split()) + # expected to be skipped on every platform, even Linux + self.expected.add('test_linuxaudiodev') + if not os.path.supports_unicode_filenames: self.expected.add('test_pep277') @@ -1232,20 +1119,23 @@ class _ExpectedSkips: if not sys.platform in ("mac", "darwin"): MAC_ONLY = ["test_macostools", "test_aepack", - "test_plistlib", "test_scriptpackages"] + "test_plistlib", "test_scriptpackages", + "test_applesingle"] for skip in MAC_ONLY: self.expected.add(skip) if sys.platform != "win32": + # test_sqlite is only reliable on Windows where the library + # is distributed with Python WIN_ONLY = ["test_unicode_file", "test_winreg", - "test_winsound"] + "test_winsound", "test_startfile", + "test_sqlite"] for skip in WIN_ONLY: self.expected.add(skip) - if sys.platform != 'irix': - IRIX_ONLY =["test_imageop"] - for skip in IRIX_ONLY: - self.expected.add(skip) + if sys.platform != 'sunos5': + self.expected.add('test_sunaudiodev') + self.expected.add('test_nis') self.valid = True diff --git a/Lib/test/test_asyncore.py b/Lib/test/test_asyncore.py new file mode 100644 index 0000000..7602b9f --- /dev/null +++ b/Lib/test/test_asyncore.py @@ -0,0 +1,406 @@ +import asyncore +import unittest +import select +import os +import socket +import threading +import sys +import time + +from test import test_support +from test.test_support import TESTFN, run_unittest, unlink +from StringIO import StringIO + +HOST = "127.0.0.1" +PORT = 54329 + +class dummysocket: + def __init__(self): + self.closed = False + + def close(self): + self.closed = True + + def fileno(self): + return 42 + +class dummychannel: + def __init__(self): + self.socket = dummysocket() + +class exitingdummy: + def __init__(self): + pass + + def handle_read_event(self): + raise asyncore.ExitNow() + + handle_write_event = handle_read_event + handle_expt_event = handle_read_event + +class crashingdummy: + def __init__(self): + self.error_handled = False + + def handle_read_event(self): + raise Exception() + + handle_write_event = handle_read_event + handle_expt_event = handle_read_event + + def handle_error(self): + self.error_handled = True + +# used when testing senders; just collects what it gets until newline is sent +def capture_server(evt, buf): + serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + serv.settimeout(3) + serv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + serv.bind(("", PORT)) + serv.listen(5) + try: + conn, addr = serv.accept() + except socket.timeout: + pass + else: + n = 200 + while n > 0: + data = conn.recv(10) + # keep everything except for the newline terminator + buf.write(data.replace('\n', '')) + if '\n' in data: + break + n -= 1 + time.sleep(0.01) + + conn.close() + finally: + serv.close() + evt.set() + + +class HelperFunctionTests(unittest.TestCase): + def test_readwriteexc(self): + # Check exception handling behavior of read, write and _exception + + # check that ExitNow exceptions in the object handler method + # bubbles all the way up through asyncore read/write/_exception calls + tr1 = exitingdummy() + self.assertRaises(asyncore.ExitNow, asyncore.read, tr1) + self.assertRaises(asyncore.ExitNow, asyncore.write, tr1) + self.assertRaises(asyncore.ExitNow, asyncore._exception, tr1) + + # check that an exception other than ExitNow in the object handler + # method causes the handle_error method to get called + tr2 = crashingdummy() + asyncore.read(tr2) + self.assertEqual(tr2.error_handled, True) + + tr2 = crashingdummy() + asyncore.write(tr2) + self.assertEqual(tr2.error_handled, True) + + tr2 = crashingdummy() + asyncore._exception(tr2) + self.assertEqual(tr2.error_handled, True) + +## Commented out these tests because test a non-documented function +## (which is actually public, why it's not documented?). Anyway, the +## tests *and* the function uses constants in the select module that +## are not present in Windows systems (see this thread: +## http://mail.python.org/pipermail/python-list/2001-October/109973.html) +## Note even that these constants are mentioned in the select +## documentation, as a parameter of "poll" method "register", but are +## not explicit declared as constants of the module. +## . Facundo Batista +## +## def test_readwrite(self): +## # Check that correct methods are called by readwrite() +## +## class testobj: +## def __init__(self): +## self.read = False +## self.write = False +## self.expt = False +## +## def handle_read_event(self): +## self.read = True +## +## def handle_write_event(self): +## self.write = True +## +## def handle_expt_event(self): +## self.expt = True +## +## def handle_error(self): +## self.error_handled = True +## +## for flag in (select.POLLIN, select.POLLPRI): +## tobj = testobj() +## self.assertEqual(tobj.read, False) +## asyncore.readwrite(tobj, flag) +## self.assertEqual(tobj.read, True) +## +## # check that ExitNow exceptions in the object handler method +## # bubbles all the way up through asyncore readwrite call +## tr1 = exitingdummy() +## self.assertRaises(asyncore.ExitNow, asyncore.readwrite, tr1, flag) +## +## # check that an exception other than ExitNow in the object handler +## # method causes the handle_error method to get called +## tr2 = crashingdummy() +## asyncore.readwrite(tr2, flag) +## self.assertEqual(tr2.error_handled, True) +## +## tobj = testobj() +## self.assertEqual(tobj.write, False) +## asyncore.readwrite(tobj, select.POLLOUT) +## self.assertEqual(tobj.write, True) +## +## # check that ExitNow exceptions in the object handler method +## # bubbles all the way up through asyncore readwrite call +## tr1 = exitingdummy() +## self.assertRaises(asyncore.ExitNow, asyncore.readwrite, tr1, +## select.POLLOUT) +## +## # check that an exception other than ExitNow in the object handler +## # method causes the handle_error method to get called +## tr2 = crashingdummy() +## asyncore.readwrite(tr2, select.POLLOUT) +## self.assertEqual(tr2.error_handled, True) +## +## for flag in (select.POLLERR, select.POLLHUP, select.POLLNVAL): +## tobj = testobj() +## self.assertEqual(tobj.expt, False) +## asyncore.readwrite(tobj, flag) +## self.assertEqual(tobj.expt, True) +## +## # check that ExitNow exceptions in the object handler method +## # bubbles all the way up through asyncore readwrite calls +## tr1 = exitingdummy() +## self.assertRaises(asyncore.ExitNow, asyncore.readwrite, tr1, flag) +## +## # check that an exception other than ExitNow in the object handler +## # method causes the handle_error method to get called +## tr2 = crashingdummy() +## asyncore.readwrite(tr2, flag) +## self.assertEqual(tr2.error_handled, True) + + def test_closeall(self): + self.closeall_check(False) + + def test_closeall_default(self): + self.closeall_check(True) + + def closeall_check(self, usedefault): + # Check that close_all() closes everything in a given map + + l = [] + testmap = {} + for i in range(10): + c = dummychannel() + l.append(c) + self.assertEqual(c.socket.closed, False) + testmap[i] = c + + if usedefault: + socketmap = asyncore.socket_map + try: + asyncore.socket_map = testmap + asyncore.close_all() + finally: + testmap, asyncore.socket_map = asyncore.socket_map, socketmap + else: + asyncore.close_all(testmap) + + self.assertEqual(len(testmap), 0) + + for c in l: + self.assertEqual(c.socket.closed, True) + + def test_compact_traceback(self): + try: + raise Exception("I don't like spam!") + except: + real_t, real_v, real_tb = sys.exc_info() + r = asyncore.compact_traceback() + else: + self.fail("Expected exception") + + (f, function, line), t, v, info = r + self.assertEqual(os.path.split(f)[-1], 'test_asyncore.py') + self.assertEqual(function, 'test_compact_traceback') + self.assertEqual(t, real_t) + self.assertEqual(v, real_v) + self.assertEqual(info, '[%s|%s|%s]' % (f, function, line)) + + +class DispatcherTests(unittest.TestCase): + def setUp(self): + pass + + def tearDown(self): + asyncore.close_all() + + def test_basic(self): + d = asyncore.dispatcher() + self.assertEqual(d.readable(), True) + self.assertEqual(d.writable(), True) + + def test_repr(self): + d = asyncore.dispatcher() + self.assertEqual(repr(d), '<asyncore.dispatcher at %#x>' % id(d)) + + def test_log(self): + d = asyncore.dispatcher() + + # capture output of dispatcher.log() (to stderr) + fp = StringIO() + stderr = sys.stderr + l1 = "Lovely spam! Wonderful spam!" + l2 = "I don't like spam!" + try: + sys.stderr = fp + d.log(l1) + d.log(l2) + finally: + sys.stderr = stderr + + lines = fp.getvalue().splitlines() + self.assertEquals(lines, ['log: %s' % l1, 'log: %s' % l2]) + + def test_log_info(self): + d = asyncore.dispatcher() + + # capture output of dispatcher.log_info() (to stdout via print) + fp = StringIO() + stdout = sys.stdout + l1 = "Have you got anything without spam?" + l2 = "Why can't she have egg bacon spam and sausage?" + l3 = "THAT'S got spam in it!" + try: + sys.stdout = fp + d.log_info(l1, 'EGGS') + d.log_info(l2) + d.log_info(l3, 'SPAM') + finally: + sys.stdout = stdout + + lines = fp.getvalue().splitlines() + if __debug__: + expected = ['EGGS: %s' % l1, 'info: %s' % l2, 'SPAM: %s' % l3] + else: + expected = ['EGGS: %s' % l1, 'SPAM: %s' % l3] + + self.assertEquals(lines, expected) + + def test_unhandled(self): + d = asyncore.dispatcher() + + # capture output of dispatcher.log_info() (to stdout via print) + fp = StringIO() + stdout = sys.stdout + try: + sys.stdout = fp + d.handle_expt() + d.handle_read() + d.handle_write() + d.handle_connect() + d.handle_accept() + finally: + sys.stdout = stdout + + lines = fp.getvalue().splitlines() + expected = ['warning: unhandled exception', + 'warning: unhandled read event', + 'warning: unhandled write event', + 'warning: unhandled connect event', + 'warning: unhandled accept event'] + self.assertEquals(lines, expected) + + + +class dispatcherwithsend_noread(asyncore.dispatcher_with_send): + def readable(self): + return False + + def handle_connect(self): + pass + +class DispatcherWithSendTests(unittest.TestCase): + usepoll = False + + def setUp(self): + pass + + def tearDown(self): + asyncore.close_all() + + def test_send(self): + self.evt = threading.Event() + cap = StringIO() + threading.Thread(target=capture_server, args=(self.evt,cap)).start() + time.sleep(1) # Give server time to initialize + + data = "Suppose there isn't a 16-ton weight?"*5 + d = dispatcherwithsend_noread() + d.create_socket(socket.AF_INET, socket.SOCK_STREAM) + d.connect((HOST, PORT)) + d.send(data) + d.send('\n') + + n = 1000 + while d.out_buffer and n > 0: + asyncore.poll() + n -= 1 + + self.evt.wait() + + self.assertEqual(cap.getvalue(), data) + + +class DispatcherWithSendTests_UsePoll(DispatcherWithSendTests): + usepoll = True + +if hasattr(asyncore, 'file_wrapper'): + class FileWrapperTest(unittest.TestCase): + def setUp(self): + self.d = "It's not dead, it's sleeping!" + file(TESTFN, 'w').write(self.d) + + def tearDown(self): + unlink(TESTFN) + + def test_recv(self): + fd = os.open(TESTFN, os.O_RDONLY) + w = asyncore.file_wrapper(fd) + + self.assertEqual(w.fd, fd) + self.assertEqual(w.fileno(), fd) + self.assertEqual(w.recv(13), "It's not dead") + self.assertEqual(w.read(6), ", it's") + w.close() + self.assertRaises(OSError, w.read, 1) + + def test_send(self): + d1 = "Come again?" + d2 = "I want to buy some cheese." + fd = os.open(TESTFN, os.O_WRONLY | os.O_APPEND) + w = asyncore.file_wrapper(fd) + + w.write(d1) + w.send(d2) + w.close() + self.assertEqual(file(TESTFN).read(), self.d + d1 + d2) + + +def test_main(): + tests = [HelperFunctionTests, DispatcherTests, DispatcherWithSendTests, + DispatcherWithSendTests_UsePoll] + if hasattr(asyncore, 'file_wrapper'): + tests.append(FileWrapperTest) + + run_unittest(*tests) + +if __name__ == "__main__": + test_main() diff --git a/Lib/test/test_format.py b/Lib/test/test_format.py index 085768d..ade3132 100644 --- a/Lib/test/test_format.py +++ b/Lib/test/test_format.py @@ -9,6 +9,7 @@ maxsize = MAX_Py_ssize_t # test on unicode strings as well overflowok = 1 +overflowrequired = 0 def testformat(formatstr, args, output=None): if verbose: @@ -25,7 +26,11 @@ def testformat(formatstr, args, output=None): if verbose: print('overflow (this is fine)') else: - if output and result != output: + if overflowrequired: + if verbose: + print('no') + print("overflow expected on %r %% %r" % (formatstr, args)) + elif output and result != output: if verbose: print('no') print("%r %% %r == %r != %r" %\ @@ -56,6 +61,14 @@ testboth("%#.*g", (110, -1.e+100/3.)) # test some ridiculously large precision, expect overflow testboth('%12.*f', (123456, 1.0)) +# check for internal overflow validation on length of precision +overflowrequired = 1 +testboth("%#.*g", (110, -1.e+100/3.)) +testboth("%#.*G", (110, -1.e+100/3.)) +testboth("%#.*f", (110, -1.e+100/3.)) +testboth("%#.*F", (110, -1.e+100/3.)) +overflowrequired = 0 + # Formatting of long integers. Overflow is not ok overflowok = 0 testboth("%x", 10, "a") diff --git a/Lib/test/test_grammar.py b/Lib/test/test_grammar.py index 8f45382..bfc77fe 100644 --- a/Lib/test/test_grammar.py +++ b/Lib/test/test_grammar.py @@ -697,6 +697,20 @@ class GrammarTests(unittest.TestCase): def meth2(self, arg): pass def meth3(self, a1, a2): pass + # decorator: '@' dotted_name [ '(' [arglist] ')' ] NEWLINE + # decorators: decorator+ + # decorated: decorators (classdef | funcdef) + def class_decorator(x): return x + @class_decorator + class G: pass + + def testDictcomps(self): + # dictorsetmaker: ( (test ':' test (comp_for | + # (',' test ':' test)* [','])) | + # (test (comp_for | (',' test)* [','])) ) + nums = [1, 2, 3] + self.assertEqual({i:i+1 for i in nums}, {1: 2, 2: 3, 3: 4}) + def testListcomps(self): # list comprehension tests nums = [1, 2, 3, 4, 5] diff --git a/Lib/test/test_mailbox.py b/Lib/test/test_mailbox.py index 9d50225..e5930d8 100644 --- a/Lib/test/test_mailbox.py +++ b/Lib/test/test_mailbox.py @@ -427,8 +427,8 @@ class TestMailboxSuperclass(TestBase): self.assertRaises(NotImplementedError, lambda: box.__setitem__('', '')) self.assertRaises(NotImplementedError, lambda: box.keys()) self.assertRaises(NotImplementedError, lambda: box.keys()) - self.assertRaises(NotImplementedError, lambda: box.values().next()) - self.assertRaises(NotImplementedError, lambda: box.__iter__().next()) + self.assertRaises(NotImplementedError, lambda: box.values().__next__()) + self.assertRaises(NotImplementedError, lambda: box.__iter__().__next__()) self.assertRaises(NotImplementedError, lambda: box.values()) self.assertRaises(NotImplementedError, lambda: box.items().next()) self.assertRaises(NotImplementedError, lambda: box.items()) @@ -679,7 +679,18 @@ class TestMaildir(TestMailbox): folder1_alias = box.get_folder('folder1') self.assert_(folder1_alias._factory is dummy_factory) + def test_directory_in_folder (self): + # Test that mailboxes still work if there's a stray extra directory + # in a folder. + for i in range(10): + self._box.add(mailbox.Message(_sample_message)) + + # Create a stray directory + os.mkdir(os.path.join(self._path, 'cur', 'stray-dir')) + # Check that looping still works with the directory present. + for msg in self._box: + pass class _TestMboxMMDF(TestMailbox): diff --git a/Lib/test/test_parser.py b/Lib/test/test_parser.py index ddb58b5..dbf6a92 100644 --- a/Lib/test/test_parser.py +++ b/Lib/test/test_parser.py @@ -443,6 +443,12 @@ class CompileTestCase(unittest.TestCase): st = parser.suite('1 = 3 + 4') self.assertRaises(SyntaxError, parser.compilest, st) + def test_compile_badunicode(self): + st = parser.suite('a = u"\U12345678"') + self.assertRaises(SyntaxError, parser.compilest, st) + st = parser.suite('a = u"\u1"') + self.assertRaises(SyntaxError, parser.compilest, st) + def test_main(): test_support.run_unittest( RoundtripLegalSyntaxTestCase, diff --git a/Lib/test/test_urllib2.py b/Lib/test/test_urllib2.py index 99e3047..488e9e2 100644 --- a/Lib/test/test_urllib2.py +++ b/Lib/test/test_urllib2.py @@ -380,6 +380,12 @@ class MockPasswordManager: class OpenerDirectorTests(unittest.TestCase): + def test_add_non_handler(self): + class NonHandler(object): + pass + self.assertRaises(TypeError, + OpenerDirector().add_handler, NonHandler()) + def test_badly_named_methods(self): # test work-around for three methods that accidentally follow the # naming conventions for handler methods diff --git a/Lib/test/test_xmlrpc.py b/Lib/test/test_xmlrpc.py index 791cca7..0b8a963 100644 --- a/Lib/test/test_xmlrpc.py +++ b/Lib/test/test_xmlrpc.py @@ -1,5 +1,7 @@ +import base64 import datetime import sys +import time import unittest import xmlrpclib from test import test_support @@ -18,6 +20,10 @@ alist = [{'astring': 'foo@bar.baz.spam', (2005, 2, 10, 11, 41, 23, 0, 1, -1)), 'datetime3': xmlrpclib.DateTime( datetime.datetime(2005, 2, 10, 11, 41, 23)), + 'datetime4': xmlrpclib.DateTime( + datetime.date(2005, 2, 10)), + 'datetime5': xmlrpclib.DateTime( + datetime.time(11, 41, 23)), }] class XMLRPCTestCase(unittest.TestCase): @@ -94,11 +100,37 @@ class XMLRPCTestCase(unittest.TestCase): def test_dump_bad_dict(self): self.assertRaises(TypeError, xmlrpclib.dumps, ({(1,2,3): 1},)) + def test_dump_recursive_seq(self): + l = [1,2,3] + t = [3,4,5,l] + l.append(t) + self.assertRaises(TypeError, xmlrpclib.dumps, (l,)) + + def test_dump_recursive_dict(self): + d = {'1':1, '2':1} + t = {'3':3, 'd':d} + d['t'] = t + self.assertRaises(TypeError, xmlrpclib.dumps, (d,)) + def test_dump_big_int(self): if sys.maxint > 2**31-1: self.assertRaises(OverflowError, xmlrpclib.dumps, (int(2**34),)) + xmlrpclib.dumps((xmlrpclib.MAXINT, xmlrpclib.MININT)) + self.assertRaises(OverflowError, xmlrpclib.dumps, (xmlrpclib.MAXINT+1,)) + self.assertRaises(OverflowError, xmlrpclib.dumps, (xmlrpclib.MININT-1,)) + + def dummy_write(s): + pass + + m = xmlrpclib.Marshaller() + m.dump_int(xmlrpclib.MAXINT, dummy_write) + m.dump_int(xmlrpclib.MININT, dummy_write) + self.assertRaises(OverflowError, m.dump_int, xmlrpclib.MAXINT+1, dummy_write) + self.assertRaises(OverflowError, m.dump_int, xmlrpclib.MININT-1, dummy_write) + + def test_dump_none(self): value = alist + [None] arg1 = (alist + [None],) @@ -146,8 +178,109 @@ class XMLRPCTestCase(unittest.TestCase): self.assert_(isinstance(items[0][0], str)) self.assert_(isinstance(items[0][1], str)) + +class HelperTestCase(unittest.TestCase): + def test_escape(self): + self.assertEqual(xmlrpclib.escape("a&b"), "a&b") + self.assertEqual(xmlrpclib.escape("a<b"), "a<b") + self.assertEqual(xmlrpclib.escape("a>b"), "a>b") + +class FaultTestCase(unittest.TestCase): + def test_repr(self): + f = xmlrpclib.Fault(42, 'Test Fault') + self.assertEqual(repr(f), "<Fault 42: 'Test Fault'>") + self.assertEqual(repr(f), str(f)) + + def test_dump_fault(self): + f = xmlrpclib.Fault(42, 'Test Fault') + s = xmlrpclib.dumps((f,)) + (newf,), m = xmlrpclib.loads(s) + self.assertEquals(newf, {'faultCode': 42, 'faultString': 'Test Fault'}) + self.assertEquals(m, None) + + s = xmlrpclib.Marshaller().dumps(f) + self.assertRaises(xmlrpclib.Fault, xmlrpclib.loads, s) + + +class DateTimeTestCase(unittest.TestCase): + def test_default(self): + t = xmlrpclib.DateTime() + + def test_time(self): + d = 1181399930.036952 + t = xmlrpclib.DateTime(d) + self.assertEqual(str(t), time.strftime("%Y%m%dT%H:%M:%S", time.localtime(d))) + + def test_time_tuple(self): + d = (2007,6,9,10,38,50,5,160,0) + t = xmlrpclib.DateTime(d) + self.assertEqual(str(t), '20070609T10:38:50') + + def test_time_struct(self): + d = time.localtime(1181399930.036952) + t = xmlrpclib.DateTime(d) + self.assertEqual(str(t), time.strftime("%Y%m%dT%H:%M:%S", d)) + + def test_datetime_datetime(self): + d = datetime.datetime(2007,1,2,3,4,5) + t = xmlrpclib.DateTime(d) + self.assertEqual(str(t), '20070102T03:04:05') + + def test_datetime_date(self): + d = datetime.date(2007,9,8) + t = xmlrpclib.DateTime(d) + self.assertEqual(str(t), '20070908T00:00:00') + + def test_datetime_time(self): + d = datetime.time(13,17,19) + # allow for date rollover by checking today's or tomorrow's dates + dd1 = datetime.datetime.now().date() + dd2 = dd1 + datetime.timedelta(days=1) + vals = (dd1.strftime('%Y%m%dT13:17:19'), + dd2.strftime('%Y%m%dT13:17:19')) + t = xmlrpclib.DateTime(d) + self.assertEqual(str(t) in vals, True) + + def test_repr(self): + d = datetime.datetime(2007,1,2,3,4,5) + t = xmlrpclib.DateTime(d) + val ="<DateTime '20070102T03:04:05' at %x>" % id(t) + self.assertEqual(repr(t), val) + + def test_decode(self): + d = ' 20070908T07:11:13 ' + t1 = xmlrpclib.DateTime() + t1.decode(d) + tref = xmlrpclib.DateTime(datetime.datetime(2007,9,8,7,11,13)) + self.assertEqual(t1, tref) + + t2 = xmlrpclib._datetime(d) + self.assertEqual(t1, tref) + +class BinaryTestCase(unittest.TestCase): + def test_default(self): + t = xmlrpclib.Binary() + self.assertEqual(str(t), '') + + def test_string(self): + d = '\x01\x02\x03abc123\xff\xfe' + t = xmlrpclib.Binary(d) + self.assertEqual(str(t), d) + + def test_decode(self): + d = '\x01\x02\x03abc123\xff\xfe' + de = base64.encodestring(d) + t1 = xmlrpclib.Binary() + t1.decode(de) + self.assertEqual(str(t1), d) + + t2 = xmlrpclib._binary(de) + self.assertEqual(str(t2), d) + + def test_main(): - test_support.run_unittest(XMLRPCTestCase) + test_support.run_unittest(XMLRPCTestCase, HelperTestCase, + DateTimeTestCase, BinaryTestCase, FaultTestCase) if __name__ == "__main__": diff --git a/Lib/test/test_zipfile.py b/Lib/test/test_zipfile.py index 197170a..eda6863 100644 --- a/Lib/test/test_zipfile.py +++ b/Lib/test/test_zipfile.py @@ -12,7 +12,7 @@ import test.test_support as support from test.test_support import TESTFN, run_unittest TESTFN2 = TESTFN + "2" -FIXEDTEST_SIZE = 10 +FIXEDTEST_SIZE = 1000 class TestsWithSourceFile(unittest.TestCase): def setUp(self): @@ -232,6 +232,63 @@ class TestsWithSourceFile(unittest.TestCase): self.assertEqual(zipfp.namelist(), ["absolute"]) zipfp.close() + def testAppendToZipFile(self): + # Test appending to an existing zipfile + zipfp = zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) + zipfp.write(TESTFN, TESTFN) + zipfp.close() + zipfp = zipfile.ZipFile(TESTFN2, "a", zipfile.ZIP_STORED) + zipfp.writestr("strfile", self.data) + self.assertEqual(zipfp.namelist(), [TESTFN, "strfile"]) + zipfp.close() + + def testAppendToNonZipFile(self): + # Test appending to an existing file that is not a zipfile + # NOTE: this test fails if len(d) < 22 because of the first + # line "fpin.seek(-22, 2)" in _EndRecData + d = 'I am not a ZipFile!'*10 + f = file(TESTFN2, 'wb') + f.write(d) + f.close() + zipfp = zipfile.ZipFile(TESTFN2, "a", zipfile.ZIP_STORED) + zipfp.write(TESTFN, TESTFN) + zipfp.close() + + f = file(TESTFN2, 'rb') + f.seek(len(d)) + zipfp = zipfile.ZipFile(f, "r") + self.assertEqual(zipfp.namelist(), [TESTFN]) + zipfp.close() + f.close() + + def test_WriteDefaultName(self): + # Check that calling ZipFile.write without arcname specified produces the expected result + zipfp = zipfile.ZipFile(TESTFN2, "w") + zipfp.write(TESTFN) + self.assertEqual(zipfp.read(TESTFN), file(TESTFN).read()) + zipfp.close() + + def test_PerFileCompression(self): + # Check that files within a Zip archive can have different compression options + zipfp = zipfile.ZipFile(TESTFN2, "w") + zipfp.write(TESTFN, 'storeme', zipfile.ZIP_STORED) + zipfp.write(TESTFN, 'deflateme', zipfile.ZIP_DEFLATED) + sinfo = zipfp.getinfo('storeme') + dinfo = zipfp.getinfo('deflateme') + self.assertEqual(sinfo.compress_type, zipfile.ZIP_STORED) + self.assertEqual(dinfo.compress_type, zipfile.ZIP_DEFLATED) + zipfp.close() + + def test_WriteToReadonly(self): + # Check that trying to call write() on a readonly ZipFile object + # raises a RuntimeError + zipf = zipfile.ZipFile(TESTFN2, mode="w") + zipf.writestr("somefile.txt", "bogus") + zipf.close() + zipf = zipfile.ZipFile(TESTFN2, mode="r") + self.assertRaises(RuntimeError, zipf.write, TESTFN) + zipf.close() + def tearDown(self): os.remove(TESTFN) os.remove(TESTFN2) @@ -349,7 +406,6 @@ class TestZip64InSmallFiles(unittest.TestCase): self.assertEqual(zipfp.namelist(), ["absolute"]) zipfp.close() - def tearDown(self): zipfile.ZIP64_LIMIT = self._limit os.remove(TESTFN) @@ -420,6 +476,11 @@ class PyZipFileTests(unittest.TestCase): finally: shutil.rmtree(TESTFN2) + def testWriteNonPyfile(self): + zipfp = zipfile.PyZipFile(TemporaryFile(), "w") + file(TESTFN, 'w').write('most definitely not a python file') + self.assertRaises(RuntimeError, zipfp.writepy, TESTFN) + os.remove(TESTFN) class OtherTests(unittest.TestCase): @@ -501,7 +562,56 @@ class OtherTests(unittest.TestCase): # a RuntimeError, and so should calling .testzip. An earlier # version of .testzip would swallow this exception (and any other) # and report that the first file in the archive was corrupt. + self.assertRaises(RuntimeError, zipf.read, "foo.txt") + self.assertRaises(RuntimeError, zipf.open, "foo.txt") self.assertRaises(RuntimeError, zipf.testzip) + self.assertRaises(RuntimeError, zipf.writestr, "bogus.txt", "bogus") + file(TESTFN, 'w').write('zipfile test data') + self.assertRaises(RuntimeError, zipf.write, TESTFN) + + def test_BadConstructorMode(self): + # Check that bad modes passed to ZipFile constructor are caught + self.assertRaises(RuntimeError, zipfile.ZipFile, TESTFN, "q") + + def test_BadOpenMode(self): + # Check that bad modes passed to ZipFile.open are caught + zipf = zipfile.ZipFile(TESTFN, mode="w") + zipf.writestr("foo.txt", "O, for a Muse of Fire!") + zipf.close() + zipf = zipfile.ZipFile(TESTFN, mode="r") + # read the data to make sure the file is there + zipf.read("foo.txt") + self.assertRaises(RuntimeError, zipf.open, "foo.txt", "q") + zipf.close() + + def test_Read0(self): + # Check that calling read(0) on a ZipExtFile object returns an empty + # string and doesn't advance file pointer + zipf = zipfile.ZipFile(TESTFN, mode="w") + zipf.writestr("foo.txt", "O, for a Muse of Fire!") + # read the data to make sure the file is there + f = zipf.open("foo.txt") + for i in range(FIXEDTEST_SIZE): + self.assertEqual(f.read(0), '') + + self.assertEqual(f.read(), "O, for a Muse of Fire!") + zipf.close() + + def test_OpenNonexistentItem(self): + # Check that attempting to call open() for an item that doesn't + # exist in the archive raises a RuntimeError + zipf = zipfile.ZipFile(TESTFN, mode="w") + self.assertRaises(KeyError, zipf.open, "foo.txt", "r") + + def test_BadCompressionMode(self): + # Check that bad compression methods passed to ZipFile.open are caught + self.assertRaises(RuntimeError, zipfile.ZipFile, TESTFN, "w", -1) + + def test_NullByteInFilename(self): + # Check that a filename containing a null byte is properly terminated + zipf = zipfile.ZipFile(TESTFN, mode="w") + zipf.writestr("foo.txt\x00qqq", "O, for a Muse of Fire!") + self.assertEqual(zipf.namelist(), ['foo.txt']) def tearDown(self): support.unlink(TESTFN) |