diff options
Diffstat (limited to 'Lib')
-rw-r--r-- | Lib/ctypes/__init__.py | 8 | ||||
-rw-r--r-- | Lib/ctypes/test/test_find.py | 26 | ||||
-rw-r--r-- | Lib/ctypes/test/test_win32.py | 9 | ||||
-rw-r--r-- | Lib/email/test/test_email.py | 12 | ||||
-rw-r--r-- | Lib/email/test/test_email_renamed.py | 11 | ||||
-rw-r--r-- | Lib/idlelib/FileList.py | 2 | ||||
-rw-r--r-- | Lib/idlelib/MultiCall.py | 28 | ||||
-rw-r--r-- | Lib/idlelib/PyShell.py | 2 | ||||
-rw-r--r-- | Lib/idlelib/WidgetRedirector.py | 2 | ||||
-rw-r--r-- | Lib/idlelib/idlever.py | 2 | ||||
-rw-r--r-- | Lib/idlelib/macosxSupport.py | 49 | ||||
-rw-r--r-- | Lib/inspect.py | 1 | ||||
-rw-r--r-- | Lib/lib-tk/Tkinter.py | 2 | ||||
-rwxr-xr-x | Lib/mailbox.py | 8 | ||||
-rwxr-xr-x | Lib/test/regrtest.py | 160 | ||||
-rw-r--r-- | Lib/test/test_asyncore.py | 406 | ||||
-rw-r--r-- | Lib/test/test_format.py | 15 | ||||
-rw-r--r-- | Lib/test/test_grammar.py | 14 | ||||
-rw-r--r-- | Lib/test/test_mailbox.py | 15 | ||||
-rw-r--r-- | Lib/test/test_parser.py | 6 | ||||
-rw-r--r-- | Lib/test/test_urllib2.py | 6 | ||||
-rw-r--r-- | Lib/test/test_xmlrpc.py | 135 | ||||
-rw-r--r-- | Lib/test/test_zipfile.py | 114 | ||||
-rw-r--r-- | Lib/urllib2.py | 4 | ||||
-rw-r--r-- | Lib/zipfile.py | 24 |
25 files changed, 859 insertions, 202 deletions
diff --git a/Lib/ctypes/__init__.py b/Lib/ctypes/__init__.py index f62c345..2cddb11 100644 --- a/Lib/ctypes/__init__.py +++ b/Lib/ctypes/__init__.py @@ -223,6 +223,14 @@ _check_size(c_char) class c_char_p(_SimpleCData): _type_ = "z" + if _os.name == "nt": + def __repr__(self): + if not windll.kernel32.IsBadStringPtrA(self, -1): + return "%s(%r)" % (self.__class__.__name__, self.value) + return "%s(%s)" % (self.__class__.__name__, cast(self, c_void_p).value) + else: + def __repr__(self): + return "%s(%s)" % (self.__class__.__name__, cast(self, c_void_p).value) _check_size(c_char_p, "P") class c_void_p(_SimpleCData): diff --git a/Lib/ctypes/test/test_find.py b/Lib/ctypes/test/test_find.py index 71cfe93..9f61cdb 100644 --- a/Lib/ctypes/test/test_find.py +++ b/Lib/ctypes/test/test_find.py @@ -7,25 +7,21 @@ from ctypes.test import is_resource_enabled if sys.platform == "win32": lib_gl = find_library("OpenGL32") lib_glu = find_library("Glu32") - lib_glut = find_library("glut32") lib_gle = None elif sys.platform == "darwin": lib_gl = lib_glu = find_library("OpenGL") - lib_glut = find_library("GLUT") lib_gle = None else: lib_gl = find_library("GL") lib_glu = find_library("GLU") - lib_glut = find_library("glut") lib_gle = find_library("gle") ## print, for debugging if is_resource_enabled("printing"): - if lib_gl or lib_glu or lib_glut or lib_gle: + if lib_gl or lib_glu or lib_gle: print("OpenGL libraries:") for item in (("GL", lib_gl), ("GLU", lib_glu), - ("glut", lib_glut), ("gle", lib_gle)): print("\t", item) @@ -33,24 +29,11 @@ if is_resource_enabled("printing"): # On some systems, loading the OpenGL libraries needs the RTLD_GLOBAL mode. class Test_OpenGL_libs(unittest.TestCase): def setUp(self): - self.gl = self.glu = self.gle = self.glut = None + self.gl = self.glu = self.gle = None if lib_gl: self.gl = CDLL(lib_gl, mode=RTLD_GLOBAL) if lib_glu: self.glu = CDLL(lib_glu, RTLD_GLOBAL) - if lib_glut: - # On some systems, additional libraries seem to be - # required, loading glut fails with - # "OSError: /usr/lib/libglut.so.3: undefined symbol: XGetExtensionVersion" - # I cannot figure out how to repair the test on these - # systems (red hat), so we ignore it when the glut or gle - # libraries cannot be loaded. See also: - # https://sourceforge.net/tracker/?func=detail&atid=105470&aid=1478253&group_id=5470 - # http://mail.python.org/pipermail/python-dev/2006-May/064789.html - try: - self.glut = CDLL(lib_glut) - except OSError: - pass if lib_gle: try: self.gle = CDLL(lib_gle) @@ -67,11 +50,6 @@ class Test_OpenGL_libs(unittest.TestCase): if self.glu: self.glu.gluBeginCurve - if lib_glut: - def test_glut(self): - if self.glut: - self.glut.glutWireTetrahedron - if lib_gle: def test_gle(self): if self.gle: diff --git a/Lib/ctypes/test/test_win32.py b/Lib/ctypes/test/test_win32.py index 057873c..5067b60 100644 --- a/Lib/ctypes/test/test_win32.py +++ b/Lib/ctypes/test/test_win32.py @@ -58,6 +58,15 @@ if sys.platform == "win32": self.failUnlessEqual(sizeof(wintypes.LPARAM), sizeof(c_void_p)) + def test_COMError(self): + from _ctypes import COMError + self.assertEqual(COMError.__doc__, "Raised when a COM method call failed.") + + ex = COMError(-1, "text", ("details",)) + self.assertEqual(ex.hresult, -1) + self.assertEqual(ex.text, "text") + self.assertEqual(ex.details, ("details",)) + class Structures(unittest.TestCase): def test_struct_by_value(self): diff --git a/Lib/email/test/test_email.py b/Lib/email/test/test_email.py index e1d0eb7..c7c61ef 100644 --- a/Lib/email/test/test_email.py +++ b/Lib/email/test/test_email.py @@ -1491,6 +1491,18 @@ counter to RFC 2822, there's no separating newline here self.failUnless(isinstance(bad.defects[0], Errors.StartBoundaryNotFoundDefect)) + def test_first_line_is_continuation_header(self): + eq = self.assertEqual + m = ' Line 1\nLine 2\nLine 3' + msg = email.message_from_string(m) + eq(msg.keys(), []) + eq(msg.get_payload(), 'Line 2\nLine 3') + eq(len(msg.defects), 1) + self.failUnless(isinstance(msg.defects[0], + Errors.FirstHeaderLineIsContinuationDefect)) + eq(msg.defects[0].line, ' Line 1\n') + + # Test RFC 2047 header encoding and decoding diff --git a/Lib/email/test/test_email_renamed.py b/Lib/email/test/test_email_renamed.py index 4ce2184..4688a1b 100644 --- a/Lib/email/test/test_email_renamed.py +++ b/Lib/email/test/test_email_renamed.py @@ -1489,6 +1489,17 @@ counter to RFC 2822, there's no separating newline here self.failUnless(isinstance(bad.defects[0], errors.StartBoundaryNotFoundDefect)) + def test_first_line_is_continuation_header(self): + eq = self.assertEqual + m = ' Line 1\nLine 2\nLine 3' + msg = email.message_from_string(m) + eq(msg.keys(), []) + eq(msg.get_payload(), 'Line 2\nLine 3') + eq(len(msg.defects), 1) + self.failUnless(isinstance(msg.defects[0], + errors.FirstHeaderLineIsContinuationDefect)) + eq(msg.defects[0].line, ' Line 1\n') + # Test RFC 2047 header encoding and decoding diff --git a/Lib/idlelib/FileList.py b/Lib/idlelib/FileList.py index 9c6fafe..860dbae 100644 --- a/Lib/idlelib/FileList.py +++ b/Lib/idlelib/FileList.py @@ -44,7 +44,7 @@ class FileList: return self.EditorWindow(self, filename) def close_all_callback(self, event): - for edit in self.inversedict.keys(): + for edit in list(self.inversedict): reply = edit.close() if reply == "cancel": break diff --git a/Lib/idlelib/MultiCall.py b/Lib/idlelib/MultiCall.py index 4311999..f43f83e 100644 --- a/Lib/idlelib/MultiCall.py +++ b/Lib/idlelib/MultiCall.py @@ -105,18 +105,32 @@ class _SimpleBinder: # _state_subsets gives for each combination of modifiers, or *state*, # a list of the states which are a subset of it. This list is ordered by the # number of modifiers is the state - the most specific state comes first. -# XXX rewrite without overusing functional primitives :-) _states = range(1 << len(_modifiers)) _state_names = [''.join(m[0]+'-' for i, m in enumerate(_modifiers) if (1 << i) & s) for s in _states] -_state_subsets = map(lambda i: filter(lambda j: not (j & (~i)), _states), - _states) -for l in _state_subsets: - l.sort(lambda a, b, nummod = lambda x: len(filter(lambda i: (1<<i) & x, - range(len(_modifiers)))): - nummod(b) - nummod(a)) + +def expand_substates(states): + '''For each item of states return a list containing all combinations of + that item with individual bits reset, sorted by the number of set bits. + ''' + def nbits(n): + "number of bits set in n base 2" + nb = 0 + while n: + n, rem = divmod(n, 2) + nb += rem + return nb + statelist = [] + for state in states: + substates = list(set(state & x for x in states)) + substates.sort(lambda a,b: nbits(b) - nbits(a)) + statelist.append(substates) + return statelist + +_state_subsets = expand_substates(_states) + # _state_codes gives for each state, the portable code to be passed as mc_state _state_codes = [] for s in _states: diff --git a/Lib/idlelib/PyShell.py b/Lib/idlelib/PyShell.py index 2d69157..f11e609 100644 --- a/Lib/idlelib/PyShell.py +++ b/Lib/idlelib/PyShell.py @@ -351,8 +351,6 @@ class ModifiedInterpreter(InteractiveInterpreter): def build_subprocess_arglist(self): w = ['-W' + s for s in sys.warnoptions] - if 1/2 > 0: # account for new division - w.append('-Qnew') # Maybe IDLE is installed and is being accessed via sys.path, # or maybe it's not installed and the idle.py script is being # run from the IDLE source directory. diff --git a/Lib/idlelib/WidgetRedirector.py b/Lib/idlelib/WidgetRedirector.py index 59005b8..459fea7 100644 --- a/Lib/idlelib/WidgetRedirector.py +++ b/Lib/idlelib/WidgetRedirector.py @@ -19,7 +19,7 @@ class WidgetRedirector: self.widget._w) def close(self): - for name in self.dict.keys(): + for name in list(self.dict.keys()): self.unregister(name) widget = self.widget; del self.widget orig = self.orig; del self.orig diff --git a/Lib/idlelib/idlever.py b/Lib/idlelib/idlever.py index f56b4d4..636299d 100644 --- a/Lib/idlelib/idlever.py +++ b/Lib/idlelib/idlever.py @@ -1 +1 @@ -IDLE_VERSION = "2.6a0" +IDLE_VERSION = "3.0x" diff --git a/Lib/idlelib/macosxSupport.py b/Lib/idlelib/macosxSupport.py index ad61fff..222abfc 100644 --- a/Lib/idlelib/macosxSupport.py +++ b/Lib/idlelib/macosxSupport.py @@ -3,6 +3,7 @@ A number of function that enhance IDLE on MacOSX when it used as a normal GUI application (as opposed to an X11 application). """ import sys +import Tkinter def runningAsOSXApp(): """ Returns True iff running from the IDLE.app bundle on OSX """ @@ -23,7 +24,11 @@ def addOpenEventSupport(root, flist): root.createcommand("::tk::mac::OpenDocument", doOpenFile) def hideTkConsole(root): - root.tk.call('console', 'hide') + try: + root.tk.call('console', 'hide') + except Tkinter.TclError: + # Some versions of the Tk framework don't have a console object + pass def overrideRootMenu(root, flist): """ @@ -75,32 +80,40 @@ def overrideRootMenu(root, flist): import configDialog configDialog.ConfigDialog(root, 'Settings') + root.bind('<<about-idle>>', about_dialog) root.bind('<<open-config-dialog>>', config_dialog) if flist: root.bind('<<close-all-windows>>', flist.close_all_callback) - for mname, entrylist in Bindings.menudefs: - menu = menudict.get(mname) - if not menu: - continue - for entry in entrylist: - if not entry: - menu.add_separator() + + ###check if Tk version >= 8.4.14; if so, use hard-coded showprefs binding + tkversion = root.tk.eval('info patchlevel') + if tkversion >= '8.4.14': + Bindings.menudefs[0] = ('application', [ + ('About IDLE', '<<about-idle>>'), + None, + ]) + root.createcommand('::tk::mac::ShowPreferences', config_dialog) + else: + for mname, entrylist in Bindings.menudefs: + menu = menudict.get(mname) + if not menu: + continue else: - label, eventname = entry - underline, label = prepstr(label) - accelerator = get_accelerator(Bindings.default_keydefs, + for entry in entrylist: + if not entry: + menu.add_separator() + else: + label, eventname = entry + underline, label = prepstr(label) + accelerator = get_accelerator(Bindings.default_keydefs, eventname) - def command(text=root, eventname=eventname): - text.event_generate(eventname) - menu.add_command(label=label, underline=underline, + def command(text=root, eventname=eventname): + text.event_generate(eventname) + menu.add_command(label=label, underline=underline, command=command, accelerator=accelerator) - - - - def setupApp(root, flist): """ Perform setup for the OSX application bundle. diff --git a/Lib/inspect.py b/Lib/inspect.py index b5e9ff2..45db8ee 100644 --- a/Lib/inspect.py +++ b/Lib/inspect.py @@ -690,7 +690,6 @@ def _getfullargs(co): if not iscode(co): raise TypeError('arg is not a code object') - code = co.co_code nargs = co.co_argcount names = co.co_varnames nkwargs = co.co_kwonlyargcount diff --git a/Lib/lib-tk/Tkinter.py b/Lib/lib-tk/Tkinter.py index 1e4d771..ef588e8 100644 --- a/Lib/lib-tk/Tkinter.py +++ b/Lib/lib-tk/Tkinter.py @@ -1680,7 +1680,7 @@ class Tk(Misc, Wm): def destroy(self): """Destroy this and all descendants widgets. This will end the application of this Tcl interpreter.""" - for c in self.children.values(): c.destroy() + for c in list(self.children.values()): c.destroy() self.tk.call('destroy', self._w) Misc.destroy(self) global _default_root diff --git a/Lib/mailbox.py b/Lib/mailbox.py index b9f4497..fdb118d 100755 --- a/Lib/mailbox.py +++ b/Lib/mailbox.py @@ -107,7 +107,7 @@ class Mailbox: yield value def __iter__(self): - return self.values() + return self.itervalues() def values(self): """Return a list of messages. Memory intensive.""" @@ -456,7 +456,11 @@ class Maildir(Mailbox): """Update table of contents mapping.""" self._toc = {} for subdir in ('new', 'cur'): - for entry in os.listdir(os.path.join(self._path, subdir)): + subdir_path = os.path.join(self._path, subdir) + for entry in os.listdir(subdir_path): + p = os.path.join(subdir_path, entry) + if os.path.isdir(p): + continue uniq = entry.split(self.colon)[0] self._toc[uniq] = os.path.join(subdir, entry) diff --git a/Lib/test/regrtest.py b/Lib/test/regrtest.py index 8c4b5e5..de7807c 100755 --- a/Lib/test/regrtest.py +++ b/Lib/test/regrtest.py @@ -335,12 +335,12 @@ def main(tests=None, testdir=None, verbose=0, quiet=False, generate=False, tests = map(removepy, tests) stdtests = STDTESTS[:] - nottests = NOTTESTS[:] + nottests = NOTTESTS.copy() if exclude: for arg in args: if arg in stdtests: stdtests.remove(arg) - nottests[:0] = args + nottests.add(arg) args = [] tests = tests or args or findtests(testdir, stdtests, nottests) if single: @@ -478,14 +478,14 @@ STDTESTS = [ 'test_unittest', 'test_doctest', 'test_doctest2', - ] +] -NOTTESTS = [ +NOTTESTS = { 'test_support', 'test_future1', 'test_future2', 'test_future3', - ] +} def findtests(testdir=None, stdtests=STDTESTS, nottests=NOTTESTS): """Return a list of all applicable test modules.""" @@ -818,12 +818,14 @@ def printlist(x, width=70, indent=4): # test_timeout # Controlled by test_timeout.skip_expected. Requires the network # resource and a socket module. +# +# Tests that are expected to be skipped everywhere except on one platform +# are also handled separately. _expectations = { 'win32': """ test__locale - test_applesingle test_bsddb3 test_commands test_crypt @@ -836,9 +838,7 @@ _expectations = { test_grp test_ioctl test_largefile - test_linuxaudiodev test_mhlib - test_nis test_openpty test_ossaudiodev test_poll @@ -847,24 +847,16 @@ _expectations = { test_pwd test_resource test_signal - test_sunaudiodev test_threadsignals test_wait3 test_wait4 """, 'linux2': """ - test_applesingle test_curses test_dl test_largefile - test_linuxaudiodev - test_nis - test_ntpath test_ossaudiodev - test_sqlite - test_startfile - test_sunaudiodev """, 'mac': """ @@ -882,11 +874,8 @@ _expectations = { test_grp test_ioctl test_largefile - test_linuxaudiodev test_locale test_mmap - test_nis - test_ntpath test_openpty test_ossaudiodev test_poll @@ -896,69 +885,45 @@ _expectations = { test_pwd test_resource test_signal - test_sqlite - test_startfile - test_sunaudiodev test_sundry test_tarfile """, 'unixware7': """ - test_applesingle test_bsddb test_dl test_largefile - test_linuxaudiodev test_minidom - test_nis - test_ntpath test_openpty test_pyexpat test_sax - test_startfile - test_sqlite - test_sunaudiodev test_sundry """, 'openunix8': """ - test_applesingle test_bsddb test_dl test_largefile - test_linuxaudiodev test_minidom - test_nis - test_ntpath test_openpty test_pyexpat test_sax - test_sqlite - test_startfile - test_sunaudiodev test_sundry """, 'sco_sv3': """ - test_applesingle test_asynchat test_bsddb test_dl test_fork1 test_gettext test_largefile - test_linuxaudiodev test_locale test_minidom - test_nis - test_ntpath test_openpty test_pyexpat test_queue test_sax - test_sqlite - test_startfile - test_sunaudiodev test_sundry test_thread test_threaded_import @@ -967,7 +932,6 @@ _expectations = { """, 'riscos': """ - test_applesingle test_asynchat test_atexit test_bsddb @@ -981,18 +945,12 @@ _expectations = { test_gdbm test_grp test_largefile - test_linuxaudiodev test_locale test_mmap - test_nis - test_ntpath test_openpty test_poll test_pty test_pwd - test_sqlite - test_startfile - test_sunaudiodev test_sundry test_thread test_threaded_import @@ -1001,209 +959,135 @@ _expectations = { """, 'darwin': """ + test__locale + test_bsddb + test_bsddb3 + test_curses test_gdbm test_largefile - test_linuxaudiodev test_locale - test_nis test_ossaudiodev - test_startfile - test_sunaudiodev + test_poll """, 'sunos5': """ - test_applesingle test_bsddb test_curses test_dbm test_gdbm test_gzip - test_linuxaudiodev test_openpty - test_sqlite - test_startfile test_zipfile test_zlib """, 'hp-ux11': """ - test_applesingle test_bsddb test_curses test_dl test_gdbm test_gzip test_largefile - test_linuxaudiodev test_locale test_minidom - test_nis - test_ntpath test_openpty test_pyexpat test_sax - test_sqlite - test_startfile - test_sunaudiodev test_zipfile test_zlib """, 'atheos': """ - test_applesingle test_curses test_dl test_gdbm test_largefile - test_linuxaudiodev test_locale test_mhlib test_mmap - test_nis test_poll test_resource - test_sqlite - test_startfile - test_sunaudiodev """, 'cygwin': """ - test_applesingle test_bsddb3 test_curses test_dbm test_ioctl test_largefile - test_linuxaudiodev test_locale - test_nis test_ossaudiodev test_socketserver - test_sqlite - test_sunaudiodev """, 'os2emx': """ - test_applesingle test_audioop test_bsddb3 test_commands test_curses test_dl test_largefile - test_linuxaudiodev test_mhlib test_mmap - test_nis test_openpty test_ossaudiodev test_pty test_resource test_signal - test_sqlite - test_startfile - test_sunaudiodev """, 'freebsd4': """ - test_aepack - test_applesingle test_bsddb test_bsddb3 test_gdbm - test_linuxaudiodev test_locale - test_macostools - test_nis test_ossaudiodev test_pep277 - test_plistlib test_pty - test_scriptpackages test_socket_ssl test_socketserver - test_sqlite - test_startfile - test_sunaudiodev test_tcl test_timeout - test_unicode_file test_urllibnet - test_winreg - test_winsound """, 'aix5': """ - test_aepack - test_applesingle test_bsddb test_bsddb3 test_bz2 test_dl test_gdbm test_gzip - test_linuxaudiodev - test_macostools - test_nis test_ossaudiodev - test_sqlite - test_startfile - test_sunaudiodev test_tcl - test_winreg - test_winsound test_zipimport test_zlib """, 'openbsd3': """ - test_aepack - test_applesingle test_bsddb test_bsddb3 test_ctypes test_dl test_gdbm - test_linuxaudiodev test_locale - test_macostools - test_nis test_normalization test_ossaudiodev test_pep277 - test_plistlib - test_scriptpackages test_tcl - test_sqlite - test_startfile - test_sunaudiodev - test_unicode_file - test_winreg - test_winsound """, 'netbsd3': """ - test_aepack - test_applesingle test_bsddb test_bsddb3 test_ctypes test_curses test_dl test_gdbm - test_linuxaudiodev test_locale - test_macostools - test_nis test_ossaudiodev test_pep277 - test_sqlite - test_startfile - test_sunaudiodev test_tcl - test_unicode_file - test_winreg - test_winsound """, } _expectations['freebsd5'] = _expectations['freebsd4'] @@ -1221,6 +1105,9 @@ class _ExpectedSkips: s = _expectations[sys.platform] self.expected = set(s.split()) + # expected to be skipped on every platform, even Linux + self.expected.add('test_linuxaudiodev') + if not os.path.supports_unicode_filenames: self.expected.add('test_pep277') @@ -1232,20 +1119,23 @@ class _ExpectedSkips: if not sys.platform in ("mac", "darwin"): MAC_ONLY = ["test_macostools", "test_aepack", - "test_plistlib", "test_scriptpackages"] + "test_plistlib", "test_scriptpackages", + "test_applesingle"] for skip in MAC_ONLY: self.expected.add(skip) if sys.platform != "win32": + # test_sqlite is only reliable on Windows where the library + # is distributed with Python WIN_ONLY = ["test_unicode_file", "test_winreg", - "test_winsound"] + "test_winsound", "test_startfile", + "test_sqlite"] for skip in WIN_ONLY: self.expected.add(skip) - if sys.platform != 'irix': - IRIX_ONLY =["test_imageop"] - for skip in IRIX_ONLY: - self.expected.add(skip) + if sys.platform != 'sunos5': + self.expected.add('test_sunaudiodev') + self.expected.add('test_nis') self.valid = True diff --git a/Lib/test/test_asyncore.py b/Lib/test/test_asyncore.py new file mode 100644 index 0000000..7602b9f --- /dev/null +++ b/Lib/test/test_asyncore.py @@ -0,0 +1,406 @@ +import asyncore +import unittest +import select +import os +import socket +import threading +import sys +import time + +from test import test_support +from test.test_support import TESTFN, run_unittest, unlink +from StringIO import StringIO + +HOST = "127.0.0.1" +PORT = 54329 + +class dummysocket: + def __init__(self): + self.closed = False + + def close(self): + self.closed = True + + def fileno(self): + return 42 + +class dummychannel: + def __init__(self): + self.socket = dummysocket() + +class exitingdummy: + def __init__(self): + pass + + def handle_read_event(self): + raise asyncore.ExitNow() + + handle_write_event = handle_read_event + handle_expt_event = handle_read_event + +class crashingdummy: + def __init__(self): + self.error_handled = False + + def handle_read_event(self): + raise Exception() + + handle_write_event = handle_read_event + handle_expt_event = handle_read_event + + def handle_error(self): + self.error_handled = True + +# used when testing senders; just collects what it gets until newline is sent +def capture_server(evt, buf): + serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + serv.settimeout(3) + serv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + serv.bind(("", PORT)) + serv.listen(5) + try: + conn, addr = serv.accept() + except socket.timeout: + pass + else: + n = 200 + while n > 0: + data = conn.recv(10) + # keep everything except for the newline terminator + buf.write(data.replace('\n', '')) + if '\n' in data: + break + n -= 1 + time.sleep(0.01) + + conn.close() + finally: + serv.close() + evt.set() + + +class HelperFunctionTests(unittest.TestCase): + def test_readwriteexc(self): + # Check exception handling behavior of read, write and _exception + + # check that ExitNow exceptions in the object handler method + # bubbles all the way up through asyncore read/write/_exception calls + tr1 = exitingdummy() + self.assertRaises(asyncore.ExitNow, asyncore.read, tr1) + self.assertRaises(asyncore.ExitNow, asyncore.write, tr1) + self.assertRaises(asyncore.ExitNow, asyncore._exception, tr1) + + # check that an exception other than ExitNow in the object handler + # method causes the handle_error method to get called + tr2 = crashingdummy() + asyncore.read(tr2) + self.assertEqual(tr2.error_handled, True) + + tr2 = crashingdummy() + asyncore.write(tr2) + self.assertEqual(tr2.error_handled, True) + + tr2 = crashingdummy() + asyncore._exception(tr2) + self.assertEqual(tr2.error_handled, True) + +## Commented out these tests because test a non-documented function +## (which is actually public, why it's not documented?). Anyway, the +## tests *and* the function uses constants in the select module that +## are not present in Windows systems (see this thread: +## http://mail.python.org/pipermail/python-list/2001-October/109973.html) +## Note even that these constants are mentioned in the select +## documentation, as a parameter of "poll" method "register", but are +## not explicit declared as constants of the module. +## . Facundo Batista +## +## def test_readwrite(self): +## # Check that correct methods are called by readwrite() +## +## class testobj: +## def __init__(self): +## self.read = False +## self.write = False +## self.expt = False +## +## def handle_read_event(self): +## self.read = True +## +## def handle_write_event(self): +## self.write = True +## +## def handle_expt_event(self): +## self.expt = True +## +## def handle_error(self): +## self.error_handled = True +## +## for flag in (select.POLLIN, select.POLLPRI): +## tobj = testobj() +## self.assertEqual(tobj.read, False) +## asyncore.readwrite(tobj, flag) +## self.assertEqual(tobj.read, True) +## +## # check that ExitNow exceptions in the object handler method +## # bubbles all the way up through asyncore readwrite call +## tr1 = exitingdummy() +## self.assertRaises(asyncore.ExitNow, asyncore.readwrite, tr1, flag) +## +## # check that an exception other than ExitNow in the object handler +## # method causes the handle_error method to get called +## tr2 = crashingdummy() +## asyncore.readwrite(tr2, flag) +## self.assertEqual(tr2.error_handled, True) +## +## tobj = testobj() +## self.assertEqual(tobj.write, False) +## asyncore.readwrite(tobj, select.POLLOUT) +## self.assertEqual(tobj.write, True) +## +## # check that ExitNow exceptions in the object handler method +## # bubbles all the way up through asyncore readwrite call +## tr1 = exitingdummy() +## self.assertRaises(asyncore.ExitNow, asyncore.readwrite, tr1, +## select.POLLOUT) +## +## # check that an exception other than ExitNow in the object handler +## # method causes the handle_error method to get called +## tr2 = crashingdummy() +## asyncore.readwrite(tr2, select.POLLOUT) +## self.assertEqual(tr2.error_handled, True) +## +## for flag in (select.POLLERR, select.POLLHUP, select.POLLNVAL): +## tobj = testobj() +## self.assertEqual(tobj.expt, False) +## asyncore.readwrite(tobj, flag) +## self.assertEqual(tobj.expt, True) +## +## # check that ExitNow exceptions in the object handler method +## # bubbles all the way up through asyncore readwrite calls +## tr1 = exitingdummy() +## self.assertRaises(asyncore.ExitNow, asyncore.readwrite, tr1, flag) +## +## # check that an exception other than ExitNow in the object handler +## # method causes the handle_error method to get called +## tr2 = crashingdummy() +## asyncore.readwrite(tr2, flag) +## self.assertEqual(tr2.error_handled, True) + + def test_closeall(self): + self.closeall_check(False) + + def test_closeall_default(self): + self.closeall_check(True) + + def closeall_check(self, usedefault): + # Check that close_all() closes everything in a given map + + l = [] + testmap = {} + for i in range(10): + c = dummychannel() + l.append(c) + self.assertEqual(c.socket.closed, False) + testmap[i] = c + + if usedefault: + socketmap = asyncore.socket_map + try: + asyncore.socket_map = testmap + asyncore.close_all() + finally: + testmap, asyncore.socket_map = asyncore.socket_map, socketmap + else: + asyncore.close_all(testmap) + + self.assertEqual(len(testmap), 0) + + for c in l: + self.assertEqual(c.socket.closed, True) + + def test_compact_traceback(self): + try: + raise Exception("I don't like spam!") + except: + real_t, real_v, real_tb = sys.exc_info() + r = asyncore.compact_traceback() + else: + self.fail("Expected exception") + + (f, function, line), t, v, info = r + self.assertEqual(os.path.split(f)[-1], 'test_asyncore.py') + self.assertEqual(function, 'test_compact_traceback') + self.assertEqual(t, real_t) + self.assertEqual(v, real_v) + self.assertEqual(info, '[%s|%s|%s]' % (f, function, line)) + + +class DispatcherTests(unittest.TestCase): + def setUp(self): + pass + + def tearDown(self): + asyncore.close_all() + + def test_basic(self): + d = asyncore.dispatcher() + self.assertEqual(d.readable(), True) + self.assertEqual(d.writable(), True) + + def test_repr(self): + d = asyncore.dispatcher() + self.assertEqual(repr(d), '<asyncore.dispatcher at %#x>' % id(d)) + + def test_log(self): + d = asyncore.dispatcher() + + # capture output of dispatcher.log() (to stderr) + fp = StringIO() + stderr = sys.stderr + l1 = "Lovely spam! Wonderful spam!" + l2 = "I don't like spam!" + try: + sys.stderr = fp + d.log(l1) + d.log(l2) + finally: + sys.stderr = stderr + + lines = fp.getvalue().splitlines() + self.assertEquals(lines, ['log: %s' % l1, 'log: %s' % l2]) + + def test_log_info(self): + d = asyncore.dispatcher() + + # capture output of dispatcher.log_info() (to stdout via print) + fp = StringIO() + stdout = sys.stdout + l1 = "Have you got anything without spam?" + l2 = "Why can't she have egg bacon spam and sausage?" + l3 = "THAT'S got spam in it!" + try: + sys.stdout = fp + d.log_info(l1, 'EGGS') + d.log_info(l2) + d.log_info(l3, 'SPAM') + finally: + sys.stdout = stdout + + lines = fp.getvalue().splitlines() + if __debug__: + expected = ['EGGS: %s' % l1, 'info: %s' % l2, 'SPAM: %s' % l3] + else: + expected = ['EGGS: %s' % l1, 'SPAM: %s' % l3] + + self.assertEquals(lines, expected) + + def test_unhandled(self): + d = asyncore.dispatcher() + + # capture output of dispatcher.log_info() (to stdout via print) + fp = StringIO() + stdout = sys.stdout + try: + sys.stdout = fp + d.handle_expt() + d.handle_read() + d.handle_write() + d.handle_connect() + d.handle_accept() + finally: + sys.stdout = stdout + + lines = fp.getvalue().splitlines() + expected = ['warning: unhandled exception', + 'warning: unhandled read event', + 'warning: unhandled write event', + 'warning: unhandled connect event', + 'warning: unhandled accept event'] + self.assertEquals(lines, expected) + + + +class dispatcherwithsend_noread(asyncore.dispatcher_with_send): + def readable(self): + return False + + def handle_connect(self): + pass + +class DispatcherWithSendTests(unittest.TestCase): + usepoll = False + + def setUp(self): + pass + + def tearDown(self): + asyncore.close_all() + + def test_send(self): + self.evt = threading.Event() + cap = StringIO() + threading.Thread(target=capture_server, args=(self.evt,cap)).start() + time.sleep(1) # Give server time to initialize + + data = "Suppose there isn't a 16-ton weight?"*5 + d = dispatcherwithsend_noread() + d.create_socket(socket.AF_INET, socket.SOCK_STREAM) + d.connect((HOST, PORT)) + d.send(data) + d.send('\n') + + n = 1000 + while d.out_buffer and n > 0: + asyncore.poll() + n -= 1 + + self.evt.wait() + + self.assertEqual(cap.getvalue(), data) + + +class DispatcherWithSendTests_UsePoll(DispatcherWithSendTests): + usepoll = True + +if hasattr(asyncore, 'file_wrapper'): + class FileWrapperTest(unittest.TestCase): + def setUp(self): + self.d = "It's not dead, it's sleeping!" + file(TESTFN, 'w').write(self.d) + + def tearDown(self): + unlink(TESTFN) + + def test_recv(self): + fd = os.open(TESTFN, os.O_RDONLY) + w = asyncore.file_wrapper(fd) + + self.assertEqual(w.fd, fd) + self.assertEqual(w.fileno(), fd) + self.assertEqual(w.recv(13), "It's not dead") + self.assertEqual(w.read(6), ", it's") + w.close() + self.assertRaises(OSError, w.read, 1) + + def test_send(self): + d1 = "Come again?" + d2 = "I want to buy some cheese." + fd = os.open(TESTFN, os.O_WRONLY | os.O_APPEND) + w = asyncore.file_wrapper(fd) + + w.write(d1) + w.send(d2) + w.close() + self.assertEqual(file(TESTFN).read(), self.d + d1 + d2) + + +def test_main(): + tests = [HelperFunctionTests, DispatcherTests, DispatcherWithSendTests, + DispatcherWithSendTests_UsePoll] + if hasattr(asyncore, 'file_wrapper'): + tests.append(FileWrapperTest) + + run_unittest(*tests) + +if __name__ == "__main__": + test_main() diff --git a/Lib/test/test_format.py b/Lib/test/test_format.py index 085768d..ade3132 100644 --- a/Lib/test/test_format.py +++ b/Lib/test/test_format.py @@ -9,6 +9,7 @@ maxsize = MAX_Py_ssize_t # test on unicode strings as well overflowok = 1 +overflowrequired = 0 def testformat(formatstr, args, output=None): if verbose: @@ -25,7 +26,11 @@ def testformat(formatstr, args, output=None): if verbose: print('overflow (this is fine)') else: - if output and result != output: + if overflowrequired: + if verbose: + print('no') + print("overflow expected on %r %% %r" % (formatstr, args)) + elif output and result != output: if verbose: print('no') print("%r %% %r == %r != %r" %\ @@ -56,6 +61,14 @@ testboth("%#.*g", (110, -1.e+100/3.)) # test some ridiculously large precision, expect overflow testboth('%12.*f', (123456, 1.0)) +# check for internal overflow validation on length of precision +overflowrequired = 1 +testboth("%#.*g", (110, -1.e+100/3.)) +testboth("%#.*G", (110, -1.e+100/3.)) +testboth("%#.*f", (110, -1.e+100/3.)) +testboth("%#.*F", (110, -1.e+100/3.)) +overflowrequired = 0 + # Formatting of long integers. Overflow is not ok overflowok = 0 testboth("%x", 10, "a") diff --git a/Lib/test/test_grammar.py b/Lib/test/test_grammar.py index 8f45382..bfc77fe 100644 --- a/Lib/test/test_grammar.py +++ b/Lib/test/test_grammar.py @@ -697,6 +697,20 @@ class GrammarTests(unittest.TestCase): def meth2(self, arg): pass def meth3(self, a1, a2): pass + # decorator: '@' dotted_name [ '(' [arglist] ')' ] NEWLINE + # decorators: decorator+ + # decorated: decorators (classdef | funcdef) + def class_decorator(x): return x + @class_decorator + class G: pass + + def testDictcomps(self): + # dictorsetmaker: ( (test ':' test (comp_for | + # (',' test ':' test)* [','])) | + # (test (comp_for | (',' test)* [','])) ) + nums = [1, 2, 3] + self.assertEqual({i:i+1 for i in nums}, {1: 2, 2: 3, 3: 4}) + def testListcomps(self): # list comprehension tests nums = [1, 2, 3, 4, 5] diff --git a/Lib/test/test_mailbox.py b/Lib/test/test_mailbox.py index 9d50225..e5930d8 100644 --- a/Lib/test/test_mailbox.py +++ b/Lib/test/test_mailbox.py @@ -427,8 +427,8 @@ class TestMailboxSuperclass(TestBase): self.assertRaises(NotImplementedError, lambda: box.__setitem__('', '')) self.assertRaises(NotImplementedError, lambda: box.keys()) self.assertRaises(NotImplementedError, lambda: box.keys()) - self.assertRaises(NotImplementedError, lambda: box.values().next()) - self.assertRaises(NotImplementedError, lambda: box.__iter__().next()) + self.assertRaises(NotImplementedError, lambda: box.values().__next__()) + self.assertRaises(NotImplementedError, lambda: box.__iter__().__next__()) self.assertRaises(NotImplementedError, lambda: box.values()) self.assertRaises(NotImplementedError, lambda: box.items().next()) self.assertRaises(NotImplementedError, lambda: box.items()) @@ -679,7 +679,18 @@ class TestMaildir(TestMailbox): folder1_alias = box.get_folder('folder1') self.assert_(folder1_alias._factory is dummy_factory) + def test_directory_in_folder (self): + # Test that mailboxes still work if there's a stray extra directory + # in a folder. + for i in range(10): + self._box.add(mailbox.Message(_sample_message)) + + # Create a stray directory + os.mkdir(os.path.join(self._path, 'cur', 'stray-dir')) + # Check that looping still works with the directory present. + for msg in self._box: + pass class _TestMboxMMDF(TestMailbox): diff --git a/Lib/test/test_parser.py b/Lib/test/test_parser.py index ddb58b5..dbf6a92 100644 --- a/Lib/test/test_parser.py +++ b/Lib/test/test_parser.py @@ -443,6 +443,12 @@ class CompileTestCase(unittest.TestCase): st = parser.suite('1 = 3 + 4') self.assertRaises(SyntaxError, parser.compilest, st) + def test_compile_badunicode(self): + st = parser.suite('a = u"\U12345678"') + self.assertRaises(SyntaxError, parser.compilest, st) + st = parser.suite('a = u"\u1"') + self.assertRaises(SyntaxError, parser.compilest, st) + def test_main(): test_support.run_unittest( RoundtripLegalSyntaxTestCase, diff --git a/Lib/test/test_urllib2.py b/Lib/test/test_urllib2.py index 99e3047..488e9e2 100644 --- a/Lib/test/test_urllib2.py +++ b/Lib/test/test_urllib2.py @@ -380,6 +380,12 @@ class MockPasswordManager: class OpenerDirectorTests(unittest.TestCase): + def test_add_non_handler(self): + class NonHandler(object): + pass + self.assertRaises(TypeError, + OpenerDirector().add_handler, NonHandler()) + def test_badly_named_methods(self): # test work-around for three methods that accidentally follow the # naming conventions for handler methods diff --git a/Lib/test/test_xmlrpc.py b/Lib/test/test_xmlrpc.py index 791cca7..0b8a963 100644 --- a/Lib/test/test_xmlrpc.py +++ b/Lib/test/test_xmlrpc.py @@ -1,5 +1,7 @@ +import base64 import datetime import sys +import time import unittest import xmlrpclib from test import test_support @@ -18,6 +20,10 @@ alist = [{'astring': 'foo@bar.baz.spam', (2005, 2, 10, 11, 41, 23, 0, 1, -1)), 'datetime3': xmlrpclib.DateTime( datetime.datetime(2005, 2, 10, 11, 41, 23)), + 'datetime4': xmlrpclib.DateTime( + datetime.date(2005, 2, 10)), + 'datetime5': xmlrpclib.DateTime( + datetime.time(11, 41, 23)), }] class XMLRPCTestCase(unittest.TestCase): @@ -94,11 +100,37 @@ class XMLRPCTestCase(unittest.TestCase): def test_dump_bad_dict(self): self.assertRaises(TypeError, xmlrpclib.dumps, ({(1,2,3): 1},)) + def test_dump_recursive_seq(self): + l = [1,2,3] + t = [3,4,5,l] + l.append(t) + self.assertRaises(TypeError, xmlrpclib.dumps, (l,)) + + def test_dump_recursive_dict(self): + d = {'1':1, '2':1} + t = {'3':3, 'd':d} + d['t'] = t + self.assertRaises(TypeError, xmlrpclib.dumps, (d,)) + def test_dump_big_int(self): if sys.maxint > 2**31-1: self.assertRaises(OverflowError, xmlrpclib.dumps, (int(2**34),)) + xmlrpclib.dumps((xmlrpclib.MAXINT, xmlrpclib.MININT)) + self.assertRaises(OverflowError, xmlrpclib.dumps, (xmlrpclib.MAXINT+1,)) + self.assertRaises(OverflowError, xmlrpclib.dumps, (xmlrpclib.MININT-1,)) + + def dummy_write(s): + pass + + m = xmlrpclib.Marshaller() + m.dump_int(xmlrpclib.MAXINT, dummy_write) + m.dump_int(xmlrpclib.MININT, dummy_write) + self.assertRaises(OverflowError, m.dump_int, xmlrpclib.MAXINT+1, dummy_write) + self.assertRaises(OverflowError, m.dump_int, xmlrpclib.MININT-1, dummy_write) + + def test_dump_none(self): value = alist + [None] arg1 = (alist + [None],) @@ -146,8 +178,109 @@ class XMLRPCTestCase(unittest.TestCase): self.assert_(isinstance(items[0][0], str)) self.assert_(isinstance(items[0][1], str)) + +class HelperTestCase(unittest.TestCase): + def test_escape(self): + self.assertEqual(xmlrpclib.escape("a&b"), "a&b") + self.assertEqual(xmlrpclib.escape("a<b"), "a<b") + self.assertEqual(xmlrpclib.escape("a>b"), "a>b") + +class FaultTestCase(unittest.TestCase): + def test_repr(self): + f = xmlrpclib.Fault(42, 'Test Fault') + self.assertEqual(repr(f), "<Fault 42: 'Test Fault'>") + self.assertEqual(repr(f), str(f)) + + def test_dump_fault(self): + f = xmlrpclib.Fault(42, 'Test Fault') + s = xmlrpclib.dumps((f,)) + (newf,), m = xmlrpclib.loads(s) + self.assertEquals(newf, {'faultCode': 42, 'faultString': 'Test Fault'}) + self.assertEquals(m, None) + + s = xmlrpclib.Marshaller().dumps(f) + self.assertRaises(xmlrpclib.Fault, xmlrpclib.loads, s) + + +class DateTimeTestCase(unittest.TestCase): + def test_default(self): + t = xmlrpclib.DateTime() + + def test_time(self): + d = 1181399930.036952 + t = xmlrpclib.DateTime(d) + self.assertEqual(str(t), time.strftime("%Y%m%dT%H:%M:%S", time.localtime(d))) + + def test_time_tuple(self): + d = (2007,6,9,10,38,50,5,160,0) + t = xmlrpclib.DateTime(d) + self.assertEqual(str(t), '20070609T10:38:50') + + def test_time_struct(self): + d = time.localtime(1181399930.036952) + t = xmlrpclib.DateTime(d) + self.assertEqual(str(t), time.strftime("%Y%m%dT%H:%M:%S", d)) + + def test_datetime_datetime(self): + d = datetime.datetime(2007,1,2,3,4,5) + t = xmlrpclib.DateTime(d) + self.assertEqual(str(t), '20070102T03:04:05') + + def test_datetime_date(self): + d = datetime.date(2007,9,8) + t = xmlrpclib.DateTime(d) + self.assertEqual(str(t), '20070908T00:00:00') + + def test_datetime_time(self): + d = datetime.time(13,17,19) + # allow for date rollover by checking today's or tomorrow's dates + dd1 = datetime.datetime.now().date() + dd2 = dd1 + datetime.timedelta(days=1) + vals = (dd1.strftime('%Y%m%dT13:17:19'), + dd2.strftime('%Y%m%dT13:17:19')) + t = xmlrpclib.DateTime(d) + self.assertEqual(str(t) in vals, True) + + def test_repr(self): + d = datetime.datetime(2007,1,2,3,4,5) + t = xmlrpclib.DateTime(d) + val ="<DateTime '20070102T03:04:05' at %x>" % id(t) + self.assertEqual(repr(t), val) + + def test_decode(self): + d = ' 20070908T07:11:13 ' + t1 = xmlrpclib.DateTime() + t1.decode(d) + tref = xmlrpclib.DateTime(datetime.datetime(2007,9,8,7,11,13)) + self.assertEqual(t1, tref) + + t2 = xmlrpclib._datetime(d) + self.assertEqual(t1, tref) + +class BinaryTestCase(unittest.TestCase): + def test_default(self): + t = xmlrpclib.Binary() + self.assertEqual(str(t), '') + + def test_string(self): + d = '\x01\x02\x03abc123\xff\xfe' + t = xmlrpclib.Binary(d) + self.assertEqual(str(t), d) + + def test_decode(self): + d = '\x01\x02\x03abc123\xff\xfe' + de = base64.encodestring(d) + t1 = xmlrpclib.Binary() + t1.decode(de) + self.assertEqual(str(t1), d) + + t2 = xmlrpclib._binary(de) + self.assertEqual(str(t2), d) + + def test_main(): - test_support.run_unittest(XMLRPCTestCase) + test_support.run_unittest(XMLRPCTestCase, HelperTestCase, + DateTimeTestCase, BinaryTestCase, FaultTestCase) if __name__ == "__main__": diff --git a/Lib/test/test_zipfile.py b/Lib/test/test_zipfile.py index 197170a..eda6863 100644 --- a/Lib/test/test_zipfile.py +++ b/Lib/test/test_zipfile.py @@ -12,7 +12,7 @@ import test.test_support as support from test.test_support import TESTFN, run_unittest TESTFN2 = TESTFN + "2" -FIXEDTEST_SIZE = 10 +FIXEDTEST_SIZE = 1000 class TestsWithSourceFile(unittest.TestCase): def setUp(self): @@ -232,6 +232,63 @@ class TestsWithSourceFile(unittest.TestCase): self.assertEqual(zipfp.namelist(), ["absolute"]) zipfp.close() + def testAppendToZipFile(self): + # Test appending to an existing zipfile + zipfp = zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) + zipfp.write(TESTFN, TESTFN) + zipfp.close() + zipfp = zipfile.ZipFile(TESTFN2, "a", zipfile.ZIP_STORED) + zipfp.writestr("strfile", self.data) + self.assertEqual(zipfp.namelist(), [TESTFN, "strfile"]) + zipfp.close() + + def testAppendToNonZipFile(self): + # Test appending to an existing file that is not a zipfile + # NOTE: this test fails if len(d) < 22 because of the first + # line "fpin.seek(-22, 2)" in _EndRecData + d = 'I am not a ZipFile!'*10 + f = file(TESTFN2, 'wb') + f.write(d) + f.close() + zipfp = zipfile.ZipFile(TESTFN2, "a", zipfile.ZIP_STORED) + zipfp.write(TESTFN, TESTFN) + zipfp.close() + + f = file(TESTFN2, 'rb') + f.seek(len(d)) + zipfp = zipfile.ZipFile(f, "r") + self.assertEqual(zipfp.namelist(), [TESTFN]) + zipfp.close() + f.close() + + def test_WriteDefaultName(self): + # Check that calling ZipFile.write without arcname specified produces the expected result + zipfp = zipfile.ZipFile(TESTFN2, "w") + zipfp.write(TESTFN) + self.assertEqual(zipfp.read(TESTFN), file(TESTFN).read()) + zipfp.close() + + def test_PerFileCompression(self): + # Check that files within a Zip archive can have different compression options + zipfp = zipfile.ZipFile(TESTFN2, "w") + zipfp.write(TESTFN, 'storeme', zipfile.ZIP_STORED) + zipfp.write(TESTFN, 'deflateme', zipfile.ZIP_DEFLATED) + sinfo = zipfp.getinfo('storeme') + dinfo = zipfp.getinfo('deflateme') + self.assertEqual(sinfo.compress_type, zipfile.ZIP_STORED) + self.assertEqual(dinfo.compress_type, zipfile.ZIP_DEFLATED) + zipfp.close() + + def test_WriteToReadonly(self): + # Check that trying to call write() on a readonly ZipFile object + # raises a RuntimeError + zipf = zipfile.ZipFile(TESTFN2, mode="w") + zipf.writestr("somefile.txt", "bogus") + zipf.close() + zipf = zipfile.ZipFile(TESTFN2, mode="r") + self.assertRaises(RuntimeError, zipf.write, TESTFN) + zipf.close() + def tearDown(self): os.remove(TESTFN) os.remove(TESTFN2) @@ -349,7 +406,6 @@ class TestZip64InSmallFiles(unittest.TestCase): self.assertEqual(zipfp.namelist(), ["absolute"]) zipfp.close() - def tearDown(self): zipfile.ZIP64_LIMIT = self._limit os.remove(TESTFN) @@ -420,6 +476,11 @@ class PyZipFileTests(unittest.TestCase): finally: shutil.rmtree(TESTFN2) + def testWriteNonPyfile(self): + zipfp = zipfile.PyZipFile(TemporaryFile(), "w") + file(TESTFN, 'w').write('most definitely not a python file') + self.assertRaises(RuntimeError, zipfp.writepy, TESTFN) + os.remove(TESTFN) class OtherTests(unittest.TestCase): @@ -501,7 +562,56 @@ class OtherTests(unittest.TestCase): # a RuntimeError, and so should calling .testzip. An earlier # version of .testzip would swallow this exception (and any other) # and report that the first file in the archive was corrupt. + self.assertRaises(RuntimeError, zipf.read, "foo.txt") + self.assertRaises(RuntimeError, zipf.open, "foo.txt") self.assertRaises(RuntimeError, zipf.testzip) + self.assertRaises(RuntimeError, zipf.writestr, "bogus.txt", "bogus") + file(TESTFN, 'w').write('zipfile test data') + self.assertRaises(RuntimeError, zipf.write, TESTFN) + + def test_BadConstructorMode(self): + # Check that bad modes passed to ZipFile constructor are caught + self.assertRaises(RuntimeError, zipfile.ZipFile, TESTFN, "q") + + def test_BadOpenMode(self): + # Check that bad modes passed to ZipFile.open are caught + zipf = zipfile.ZipFile(TESTFN, mode="w") + zipf.writestr("foo.txt", "O, for a Muse of Fire!") + zipf.close() + zipf = zipfile.ZipFile(TESTFN, mode="r") + # read the data to make sure the file is there + zipf.read("foo.txt") + self.assertRaises(RuntimeError, zipf.open, "foo.txt", "q") + zipf.close() + + def test_Read0(self): + # Check that calling read(0) on a ZipExtFile object returns an empty + # string and doesn't advance file pointer + zipf = zipfile.ZipFile(TESTFN, mode="w") + zipf.writestr("foo.txt", "O, for a Muse of Fire!") + # read the data to make sure the file is there + f = zipf.open("foo.txt") + for i in range(FIXEDTEST_SIZE): + self.assertEqual(f.read(0), '') + + self.assertEqual(f.read(), "O, for a Muse of Fire!") + zipf.close() + + def test_OpenNonexistentItem(self): + # Check that attempting to call open() for an item that doesn't + # exist in the archive raises a RuntimeError + zipf = zipfile.ZipFile(TESTFN, mode="w") + self.assertRaises(KeyError, zipf.open, "foo.txt", "r") + + def test_BadCompressionMode(self): + # Check that bad compression methods passed to ZipFile.open are caught + self.assertRaises(RuntimeError, zipfile.ZipFile, TESTFN, "w", -1) + + def test_NullByteInFilename(self): + # Check that a filename containing a null byte is properly terminated + zipf = zipfile.ZipFile(TESTFN, mode="w") + zipf.writestr("foo.txt\x00qqq", "O, for a Muse of Fire!") + self.assertEqual(zipf.namelist(), ['foo.txt']) def tearDown(self): support.unlink(TESTFN) diff --git a/Lib/urllib2.py b/Lib/urllib2.py index 9e95fda..9c773fc 100644 --- a/Lib/urllib2.py +++ b/Lib/urllib2.py @@ -293,6 +293,10 @@ class OpenerDirector: self.process_request = {} def add_handler(self, handler): + if not hasattr(handler, "add_parent"): + raise TypeError("expected BaseHandler instance, got %r" % + type(handler)) + added = False for meth in dir(handler): if meth in ["redirect_request", "do_open", "proxy_open"]: diff --git a/Lib/zipfile.py b/Lib/zipfile.py index 4791aea..63551a6 100644 --- a/Lib/zipfile.py +++ b/Lib/zipfile.py @@ -574,8 +574,9 @@ class ZipFile: def __init__(self, file, mode="r", compression=ZIP_STORED, allowZip64=False): """Open the ZIP file with mode read "r", write "w" or append "a".""" - self._allowZip64 = allowZip64 - self._didModify = False + if mode not in ("r", "w", "a"): + raise RuntimeError('ZipFile() requires mode "r", "w", or "a"') + if compression == ZIP_STORED: pass elif compression == ZIP_DEFLATED: @@ -584,6 +585,9 @@ class ZipFile: "Compression requires the (missing) zlib module" else: raise RuntimeError, "That compression method is not supported" + + self._allowZip64 = allowZip64 + self._didModify = False self.debug = 0 # Level of printing: 0 through 3 self.NameToInfo = {} # Find file info given name self.filelist = [] # List of ZipInfo instances for archive @@ -729,7 +733,12 @@ class ZipFile: def getinfo(self, name): """Return the instance of ZipInfo given 'name'.""" - return self.NameToInfo[name] + info = self.NameToInfo.get(name) + if info is None: + raise KeyError( + 'There is no item named %r in the archive' % name) + + return info def setpassword(self, pwd): """Set default password for encrypted files.""" @@ -834,6 +843,10 @@ class ZipFile: def write(self, filename, arcname=None, compress_type=None): """Put the bytes from filename into the archive under the name arcname.""" + if not self.fp: + raise RuntimeError( + "Attempt to write to ZIP archive that was already closed") + st = os.stat(filename) mtime = time.localtime(st.st_mtime) date_time = mtime[0:6] @@ -906,6 +919,11 @@ class ZipFile: zinfo.compress_type = self.compression else: zinfo = zinfo_or_arcname + + if not self.fp: + raise RuntimeError( + "Attempt to write to ZIP archive that was already closed") + zinfo.file_size = len(bytes) # Uncompressed size zinfo.header_offset = self.fp.tell() # Start of header bytes self._writecheck(zinfo) |