diff options
author | Serhiy Storchaka <storchaka@gmail.com> | 2013-11-03 19:31:38 (GMT) |
---|---|---|
committer | Serhiy Storchaka <storchaka@gmail.com> | 2013-11-03 19:31:38 (GMT) |
commit | 43767638a9590689fd8bcd1fcedd15bbe4660856 (patch) | |
tree | 8e96e92a7d1e7af2a20aaa15c9383e0d570acacc /Lib | |
parent | 7a07cc90c73b5a5587536c2f3fc5c2909b59be39 (diff) | |
download | cpython-43767638a9590689fd8bcd1fcedd15bbe4660856.zip cpython-43767638a9590689fd8bcd1fcedd15bbe4660856.tar.gz cpython-43767638a9590689fd8bcd1fcedd15bbe4660856.tar.bz2 |
Issue #18702: All skipped tests now reported as skipped.
Diffstat (limited to 'Lib')
-rwxr-xr-x | Lib/test/test_array.py | 17 | ||||
-rw-r--r-- | Lib/test/test_compileall.py | 3 | ||||
-rw-r--r-- | Lib/test/test_csv.py | 143 | ||||
-rw-r--r-- | Lib/test/test_dbm_dumb.py | 6 | ||||
-rw-r--r-- | Lib/test/test_enumerate.py | 3 | ||||
-rw-r--r-- | Lib/test/test_ftplib.py | 14 | ||||
-rw-r--r-- | Lib/test/test_mailbox.py | 36 | ||||
-rw-r--r-- | Lib/test/test_math.py | 61 | ||||
-rw-r--r-- | Lib/test/test_mmap.py | 150 | ||||
-rw-r--r-- | Lib/test/test_nntplib.py | 74 | ||||
-rw-r--r-- | Lib/test/test_os.py | 493 | ||||
-rw-r--r-- | Lib/test/test_poplib.py | 158 | ||||
-rw-r--r-- | Lib/test/test_posix.py | 314 | ||||
-rw-r--r-- | Lib/test/test_set.py | 8 | ||||
-rw-r--r-- | Lib/test/test_shutil.py | 123 | ||||
-rw-r--r-- | Lib/test/test_socket.py | 103 | ||||
-rw-r--r-- | Lib/test/test_socketserver.py | 85 | ||||
-rw-r--r-- | Lib/test/test_sys.py | 17 | ||||
-rw-r--r-- | Lib/test/test_warnings.py | 3 | ||||
-rw-r--r-- | Lib/test/test_zlib.py | 183 |
20 files changed, 1017 insertions, 977 deletions
diff --git a/Lib/test/test_array.py b/Lib/test/test_array.py index a44ee07..a5c4f23 100755 --- a/Lib/test/test_array.py +++ b/Lib/test/test_array.py @@ -11,6 +11,7 @@ import operator import io import math import struct +import sys import warnings import array @@ -993,15 +994,15 @@ class BaseTest: s = None self.assertRaises(ReferenceError, len, p) + @unittest.skipUnless(hasattr(sys, 'getrefcount'), + 'test needs sys.getrefcount()') def test_bug_782369(self): - import sys - if hasattr(sys, "getrefcount"): - for i in range(10): - b = array.array('B', range(64)) - rc = sys.getrefcount(10) - for i in range(10): - b = array.array('B', range(64)) - self.assertEqual(rc, sys.getrefcount(10)) + for i in range(10): + b = array.array('B', range(64)) + rc = sys.getrefcount(10) + for i in range(10): + b = array.array('B', range(64)) + self.assertEqual(rc, sys.getrefcount(10)) def test_subclass_with_kwargs(self): # SF bug #1486663 -- this used to erroneously raise a TypeError diff --git a/Lib/test/test_compileall.py b/Lib/test/test_compileall.py index 2122ade..ff2515d 100644 --- a/Lib/test/test_compileall.py +++ b/Lib/test/test_compileall.py @@ -40,11 +40,10 @@ class CompileallTests(unittest.TestCase): compare = struct.pack('<4sl', importlib.util.MAGIC_NUMBER, mtime) return data, compare + @unittest.skipUnless(hasattr(os, 'stat'), 'test needs os.stat()') def recreation_check(self, metadata): """Check that compileall recreates bytecode when the new metadata is used.""" - if not hasattr(os, 'stat'): - return py_compile.compile(self.source_path) self.assertEqual(*self.data()) with open(self.bc_path, 'rb') as file: diff --git a/Lib/test/test_csv.py b/Lib/test/test_csv.py index 559e51f..cf0b09c 100644 --- a/Lib/test/test_csv.py +++ b/Lib/test/test_csv.py @@ -905,78 +905,77 @@ Stonecutters Seafood and Chop House+ Lemont+ IL+ 12/19/02+ Week Back dialect = sniffer.sniff(self.sample9) self.assertTrue(dialect.doublequote) -if not hasattr(sys, "gettotalrefcount"): - if support.verbose: print("*** skipping leakage tests ***") -else: - class NUL: - def write(s, *args): - pass - writelines = write - - class TestLeaks(unittest.TestCase): - def test_create_read(self): - delta = 0 - lastrc = sys.gettotalrefcount() - for i in range(20): - gc.collect() - self.assertEqual(gc.garbage, []) - rc = sys.gettotalrefcount() - csv.reader(["a,b,c\r\n"]) - csv.reader(["a,b,c\r\n"]) - csv.reader(["a,b,c\r\n"]) - delta = rc-lastrc - lastrc = rc - # if csv.reader() leaks, last delta should be 3 or more - self.assertEqual(delta < 3, True) - - def test_create_write(self): - delta = 0 - lastrc = sys.gettotalrefcount() - s = NUL() - for i in range(20): - gc.collect() - self.assertEqual(gc.garbage, []) - rc = sys.gettotalrefcount() - csv.writer(s) - csv.writer(s) - csv.writer(s) - delta = rc-lastrc - lastrc = rc - # if csv.writer() leaks, last delta should be 3 or more - self.assertEqual(delta < 3, True) - - def test_read(self): - delta = 0 - rows = ["a,b,c\r\n"]*5 - lastrc = sys.gettotalrefcount() - for i in range(20): - gc.collect() - self.assertEqual(gc.garbage, []) - rc = sys.gettotalrefcount() - rdr = csv.reader(rows) - for row in rdr: - pass - delta = rc-lastrc - lastrc = rc - # if reader leaks during read, delta should be 5 or more - self.assertEqual(delta < 5, True) - - def test_write(self): - delta = 0 - rows = [[1,2,3]]*5 - s = NUL() - lastrc = sys.gettotalrefcount() - for i in range(20): - gc.collect() - self.assertEqual(gc.garbage, []) - rc = sys.gettotalrefcount() - writer = csv.writer(s) - for row in rows: - writer.writerow(row) - delta = rc-lastrc - lastrc = rc - # if writer leaks during write, last delta should be 5 or more - self.assertEqual(delta < 5, True) +class NUL: + def write(s, *args): + pass + writelines = write + +@unittest.skipUnless(hasattr(sys, "gettotalrefcount"), + 'requires sys.gettotalrefcount()') +class TestLeaks(unittest.TestCase): + def test_create_read(self): + delta = 0 + lastrc = sys.gettotalrefcount() + for i in range(20): + gc.collect() + self.assertEqual(gc.garbage, []) + rc = sys.gettotalrefcount() + csv.reader(["a,b,c\r\n"]) + csv.reader(["a,b,c\r\n"]) + csv.reader(["a,b,c\r\n"]) + delta = rc-lastrc + lastrc = rc + # if csv.reader() leaks, last delta should be 3 or more + self.assertEqual(delta < 3, True) + + def test_create_write(self): + delta = 0 + lastrc = sys.gettotalrefcount() + s = NUL() + for i in range(20): + gc.collect() + self.assertEqual(gc.garbage, []) + rc = sys.gettotalrefcount() + csv.writer(s) + csv.writer(s) + csv.writer(s) + delta = rc-lastrc + lastrc = rc + # if csv.writer() leaks, last delta should be 3 or more + self.assertEqual(delta < 3, True) + + def test_read(self): + delta = 0 + rows = ["a,b,c\r\n"]*5 + lastrc = sys.gettotalrefcount() + for i in range(20): + gc.collect() + self.assertEqual(gc.garbage, []) + rc = sys.gettotalrefcount() + rdr = csv.reader(rows) + for row in rdr: + pass + delta = rc-lastrc + lastrc = rc + # if reader leaks during read, delta should be 5 or more + self.assertEqual(delta < 5, True) + + def test_write(self): + delta = 0 + rows = [[1,2,3]]*5 + s = NUL() + lastrc = sys.gettotalrefcount() + for i in range(20): + gc.collect() + self.assertEqual(gc.garbage, []) + rc = sys.gettotalrefcount() + writer = csv.writer(s) + for row in rows: + writer.writerow(row) + delta = rc-lastrc + lastrc = rc + # if writer leaks during write, last delta should be 5 or more + self.assertEqual(delta < 5, True) class TestUnicode(unittest.TestCase): diff --git a/Lib/test/test_dbm_dumb.py b/Lib/test/test_dbm_dumb.py index e233929..208bc4c 100644 --- a/Lib/test/test_dbm_dumb.py +++ b/Lib/test/test_dbm_dumb.py @@ -37,11 +37,9 @@ class DumbDBMTestCase(unittest.TestCase): self.read_helper(f) f.close() + @unittest.skipUnless(hasattr(os, 'umask'), 'test needs os.umask()') + @unittest.skipUnless(hasattr(os, 'chmod'), 'test needs os.chmod()') def test_dumbdbm_creation_mode(self): - # On platforms without chmod, don't do anything. - if not (hasattr(os, 'chmod') and hasattr(os, 'umask')): - return - try: old_umask = os.umask(0o002) f = dumbdbm.open(_fname, 'c', 0o637) diff --git a/Lib/test/test_enumerate.py b/Lib/test/test_enumerate.py index a2d18d0..8742afc 100644 --- a/Lib/test/test_enumerate.py +++ b/Lib/test/test_enumerate.py @@ -202,11 +202,10 @@ class TestReversed(unittest.TestCase, PickleTest): self.assertRaises(TypeError, reversed) self.assertRaises(TypeError, reversed, [], 'extra') + @unittest.skipUnless(hasattr(sys, 'getrefcount'), 'test needs sys.getrefcount()') def test_bug1229429(self): # this bug was never in reversed, it was in # PyObject_CallMethod, and reversed_new calls that sometimes. - if not hasattr(sys, "getrefcount"): - return def f(): pass r = f.__reversed__ = object() diff --git a/Lib/test/test_ftplib.py b/Lib/test/test_ftplib.py index c0b537a..41463e2 100644 --- a/Lib/test/test_ftplib.py +++ b/Lib/test/test_ftplib.py @@ -16,7 +16,7 @@ try: except ImportError: ssl = None -from unittest import TestCase +from unittest import TestCase, skipUnless from test import support from test.support import HOST, HOSTv6 threading = support.import_module('threading') @@ -780,6 +780,7 @@ class TestFTPClass(TestCase): self.assertRaises(ftplib.Error, self.client.storlines, 'stor', f) +@skipUnless(support.IPV6_ENABLED, "IPv6 not enabled") class TestIPv6Environment(TestCase): def setUp(self): @@ -820,6 +821,7 @@ class TestIPv6Environment(TestCase): retr() +@skipUnless(ssl, "SSL not available") class TestTLS_FTPClassMixin(TestFTPClass): """Repeat TestFTPClass tests starting the TLS layer for both control and data connections first. @@ -835,6 +837,7 @@ class TestTLS_FTPClassMixin(TestFTPClass): self.client.prot_p() +@skipUnless(ssl, "SSL not available") class TestTLS_FTPClass(TestCase): """Specific TLS_FTP class tests.""" @@ -1027,12 +1030,9 @@ class TestNetrcDeprecation(TestCase): def test_main(): - tests = [TestFTPClass, TestTimeouts, TestNetrcDeprecation] - if support.IPV6_ENABLED: - tests.append(TestIPv6Environment) - - if ssl is not None: - tests.extend([TestTLS_FTPClassMixin, TestTLS_FTPClass]) + tests = [TestFTPClass, TestTimeouts, TestNetrcDeprecation, + TestIPv6Environment, + TestTLS_FTPClassMixin, TestTLS_FTPClass] thread_info = support.threading_setup() try: diff --git a/Lib/test/test_mailbox.py b/Lib/test/test_mailbox.py index 6c679ce..78e2a30 100644 --- a/Lib/test/test_mailbox.py +++ b/Lib/test/test_mailbox.py @@ -868,10 +868,10 @@ class TestMaildir(TestMailbox, unittest.TestCase): for msg in self._box: pass + @unittest.skipUnless(hasattr(os, 'umask'), 'test needs os.umask()') + @unittest.skipUnless(hasattr(os, 'stat'), 'test needs os.stat()') def test_file_permissions(self): # Verify that message files are created without execute permissions - if not hasattr(os, "stat") or not hasattr(os, "umask"): - return msg = mailbox.MaildirMessage(self._template % 0) orig_umask = os.umask(0) try: @@ -882,12 +882,11 @@ class TestMaildir(TestMailbox, unittest.TestCase): mode = os.stat(path).st_mode self.assertFalse(mode & 0o111) + @unittest.skipUnless(hasattr(os, 'umask'), 'test needs os.umask()') + @unittest.skipUnless(hasattr(os, 'stat'), 'test needs os.stat()') def test_folder_file_perms(self): # From bug #3228, we want to verify that the file created inside a Maildir # subfolder isn't marked as executable. - if not hasattr(os, "stat") or not hasattr(os, "umask"): - return - orig_umask = os.umask(0) try: subfolder = self._box.add_folder('subfolder') @@ -1097,24 +1096,25 @@ class TestMbox(_TestMboxMMDF, unittest.TestCase): _factory = lambda self, path, factory=None: mailbox.mbox(path, factory) + @unittest.skipUnless(hasattr(os, 'umask'), 'test needs os.umask()') + @unittest.skipUnless(hasattr(os, 'stat'), 'test needs os.stat()') def test_file_perms(self): # From bug #3228, we want to verify that the mailbox file isn't executable, # even if the umask is set to something that would leave executable bits set. # We only run this test on platforms that support umask. - if hasattr(os, 'umask') and hasattr(os, 'stat'): - try: - old_umask = os.umask(0o077) - self._box.close() - os.unlink(self._path) - self._box = mailbox.mbox(self._path, create=True) - self._box.add('') - self._box.close() - finally: - os.umask(old_umask) + try: + old_umask = os.umask(0o077) + self._box.close() + os.unlink(self._path) + self._box = mailbox.mbox(self._path, create=True) + self._box.add('') + self._box.close() + finally: + os.umask(old_umask) - st = os.stat(self._path) - perms = st.st_mode - self.assertFalse((perms & 0o111)) # Execute bits should all be off. + st = os.stat(self._path) + perms = st.st_mode + self.assertFalse((perms & 0o111)) # Execute bits should all be off. def test_terminating_newline(self): message = email.message.Message() diff --git a/Lib/test/test_math.py b/Lib/test/test_math.py index 715003a..48f84ba 100644 --- a/Lib/test/test_math.py +++ b/Lib/test/test_math.py @@ -980,38 +980,37 @@ class MathTests(unittest.TestCase): # still fails this part of the test on some platforms. For now, we only # *run* test_exceptions() in verbose mode, so that this isn't normally # tested. + @unittest.skipUnless(verbose, 'requires verbose mode') + def test_exceptions(self): + try: + x = math.exp(-1000000000) + except: + # mathmodule.c is failing to weed out underflows from libm, or + # we've got an fp format with huge dynamic range + self.fail("underflowing exp() should not have raised " + "an exception") + if x != 0: + self.fail("underflowing exp() should have returned 0") + + # If this fails, probably using a strict IEEE-754 conforming libm, and x + # is +Inf afterwards. But Python wants overflows detected by default. + try: + x = math.exp(1000000000) + except OverflowError: + pass + else: + self.fail("overflowing exp() didn't trigger OverflowError") - if verbose: - def test_exceptions(self): - try: - x = math.exp(-1000000000) - except: - # mathmodule.c is failing to weed out underflows from libm, or - # we've got an fp format with huge dynamic range - self.fail("underflowing exp() should not have raised " - "an exception") - if x != 0: - self.fail("underflowing exp() should have returned 0") - - # If this fails, probably using a strict IEEE-754 conforming libm, and x - # is +Inf afterwards. But Python wants overflows detected by default. - try: - x = math.exp(1000000000) - except OverflowError: - pass - else: - self.fail("overflowing exp() didn't trigger OverflowError") - - # If this fails, it could be a puzzle. One odd possibility is that - # mathmodule.c's macros are getting confused while comparing - # Inf (HUGE_VAL) to a NaN, and artificially setting errno to ERANGE - # as a result (and so raising OverflowError instead). - try: - x = math.sqrt(-1.0) - except ValueError: - pass - else: - self.fail("sqrt(-1) didn't raise ValueError") + # If this fails, it could be a puzzle. One odd possibility is that + # mathmodule.c's macros are getting confused while comparing + # Inf (HUGE_VAL) to a NaN, and artificially setting errno to ERANGE + # as a result (and so raising OverflowError instead). + try: + x = math.sqrt(-1.0) + except ValueError: + pass + else: + self.fail("sqrt(-1) didn't raise ValueError") @requires_IEEE_754 def test_testfile(self): diff --git a/Lib/test/test_mmap.py b/Lib/test/test_mmap.py index b1cc973..6ca5e1b 100644 --- a/Lib/test/test_mmap.py +++ b/Lib/test/test_mmap.py @@ -315,26 +315,25 @@ class MmapTests(unittest.TestCase): mf.close() f.close() + @unittest.skipUnless(hasattr(os, "stat"), "needs os.stat()") def test_entire_file(self): # test mapping of entire file by passing 0 for map length - if hasattr(os, "stat"): - f = open(TESTFN, "wb+") + f = open(TESTFN, "wb+") - f.write(2**16 * b'm') # Arbitrary character - f.close() + f.write(2**16 * b'm') # Arbitrary character + f.close() - f = open(TESTFN, "rb+") - mf = mmap.mmap(f.fileno(), 0) - self.assertEqual(len(mf), 2**16, "Map size should equal file size.") - self.assertEqual(mf.read(2**16), 2**16 * b"m") - mf.close() - f.close() + f = open(TESTFN, "rb+") + mf = mmap.mmap(f.fileno(), 0) + self.assertEqual(len(mf), 2**16, "Map size should equal file size.") + self.assertEqual(mf.read(2**16), 2**16 * b"m") + mf.close() + f.close() + @unittest.skipUnless(hasattr(os, "stat"), "needs os.stat()") def test_length_0_offset(self): # Issue #10916: test mapping of remainder of file by passing 0 for # map length with an offset doesn't cause a segfault. - if not hasattr(os, "stat"): - self.skipTest("needs os.stat") # NOTE: allocation granularity is currently 65536 under Win64, # and therefore the minimum offset alignment. with open(TESTFN, "wb") as f: @@ -344,12 +343,10 @@ class MmapTests(unittest.TestCase): with mmap.mmap(f.fileno(), 0, offset=65536, access=mmap.ACCESS_READ) as mf: self.assertRaises(IndexError, mf.__getitem__, 80000) + @unittest.skipUnless(hasattr(os, "stat"), "needs os.stat()") def test_length_0_large_offset(self): # Issue #10959: test mapping of a file by passing 0 for # map length with a large offset doesn't cause a segfault. - if not hasattr(os, "stat"): - self.skipTest("needs os.stat") - with open(TESTFN, "wb") as f: f.write(115699 * b'm') # Arbitrary character @@ -561,9 +558,8 @@ class MmapTests(unittest.TestCase): return mmap.mmap.__new__(klass, -1, *args, **kwargs) anon_mmap(PAGESIZE) + @unittest.skipUnless(hasattr(mmap, 'PROT_READ'), "needs mmap.PROT_READ") def test_prot_readonly(self): - if not hasattr(mmap, 'PROT_READ'): - return mapsize = 10 with open(TESTFN, "wb") as fp: fp.write(b"a"*mapsize) @@ -617,67 +613,69 @@ class MmapTests(unittest.TestCase): self.assertEqual(m.read_byte(), b) m.close() - if os.name == 'nt': - def test_tagname(self): - data1 = b"0123456789" - data2 = b"abcdefghij" - assert len(data1) == len(data2) - - # Test same tag - m1 = mmap.mmap(-1, len(data1), tagname="foo") - m1[:] = data1 - m2 = mmap.mmap(-1, len(data2), tagname="foo") - m2[:] = data2 - self.assertEqual(m1[:], data2) - self.assertEqual(m2[:], data2) - m2.close() - m1.close() - - # Test different tag - m1 = mmap.mmap(-1, len(data1), tagname="foo") - m1[:] = data1 - m2 = mmap.mmap(-1, len(data2), tagname="boo") - m2[:] = data2 - self.assertEqual(m1[:], data1) - self.assertEqual(m2[:], data2) - m2.close() - m1.close() - - def test_crasher_on_windows(self): - # Should not crash (Issue 1733986) - m = mmap.mmap(-1, 1000, tagname="foo") - try: - mmap.mmap(-1, 5000, tagname="foo")[:] # same tagname, but larger size - except: - pass - m.close() + @unittest.skipUnless(os.name == 'nt', 'requires Windows') + def test_tagname(self): + data1 = b"0123456789" + data2 = b"abcdefghij" + assert len(data1) == len(data2) + + # Test same tag + m1 = mmap.mmap(-1, len(data1), tagname="foo") + m1[:] = data1 + m2 = mmap.mmap(-1, len(data2), tagname="foo") + m2[:] = data2 + self.assertEqual(m1[:], data2) + self.assertEqual(m2[:], data2) + m2.close() + m1.close() + + # Test different tag + m1 = mmap.mmap(-1, len(data1), tagname="foo") + m1[:] = data1 + m2 = mmap.mmap(-1, len(data2), tagname="boo") + m2[:] = data2 + self.assertEqual(m1[:], data1) + self.assertEqual(m2[:], data2) + m2.close() + m1.close() + + @unittest.skipUnless(os.name == 'nt', 'requires Windows') + def test_crasher_on_windows(self): + # Should not crash (Issue 1733986) + m = mmap.mmap(-1, 1000, tagname="foo") + try: + mmap.mmap(-1, 5000, tagname="foo")[:] # same tagname, but larger size + except: + pass + m.close() - # Should not crash (Issue 5385) - with open(TESTFN, "wb") as fp: - fp.write(b"x"*10) - f = open(TESTFN, "r+b") - m = mmap.mmap(f.fileno(), 0) - f.close() - try: - m.resize(0) # will raise OSError - except: - pass - try: - m[:] - except: - pass - m.close() + # Should not crash (Issue 5385) + with open(TESTFN, "wb") as fp: + fp.write(b"x"*10) + f = open(TESTFN, "r+b") + m = mmap.mmap(f.fileno(), 0) + f.close() + try: + m.resize(0) # will raise OSError + except: + pass + try: + m[:] + except: + pass + m.close() - def test_invalid_descriptor(self): - # socket file descriptors are valid, but out of range - # for _get_osfhandle, causing a crash when validating the - # parameters to _get_osfhandle. - s = socket.socket() - try: - with self.assertRaises(OSError): - m = mmap.mmap(s.fileno(), 10) - finally: - s.close() + @unittest.skipUnless(os.name == 'nt', 'requires Windows') + def test_invalid_descriptor(self): + # socket file descriptors are valid, but out of range + # for _get_osfhandle, causing a crash when validating the + # parameters to _get_osfhandle. + s = socket.socket() + try: + with self.assertRaises(OSError): + m = mmap.mmap(s.fileno(), 10) + finally: + s.close() def test_context_manager(self): with mmap.mmap(-1, 10) as m: diff --git a/Lib/test/test_nntplib.py b/Lib/test/test_nntplib.py index d00c9db..71a4ec0 100644 --- a/Lib/test/test_nntplib.py +++ b/Lib/test/test_nntplib.py @@ -6,10 +6,12 @@ import unittest import functools import contextlib from test import support -from nntplib import NNTP, GroupInfo, _have_ssl +from nntplib import NNTP, GroupInfo import nntplib -if _have_ssl: +try: import ssl +except ImportError: + ssl = None TIMEOUT = 30 @@ -199,23 +201,23 @@ class NetworkedNNTPTestsMixin: resp, caps = self.server.capabilities() _check_caps(caps) - if _have_ssl: - def test_starttls(self): - file = self.server.file - sock = self.server.sock - try: - self.server.starttls() - except nntplib.NNTPPermanentError: - self.skipTest("STARTTLS not supported by server.") - else: - # Check that the socket and internal pseudo-file really were - # changed. - self.assertNotEqual(file, self.server.file) - self.assertNotEqual(sock, self.server.sock) - # Check that the new socket really is an SSL one - self.assertIsInstance(self.server.sock, ssl.SSLSocket) - # Check that trying starttls when it's already active fails. - self.assertRaises(ValueError, self.server.starttls) + @unittest.skipUnless(ssl, 'requires SSL support') + def test_starttls(self): + file = self.server.file + sock = self.server.sock + try: + self.server.starttls() + except nntplib.NNTPPermanentError: + self.skipTest("STARTTLS not supported by server.") + else: + # Check that the socket and internal pseudo-file really were + # changed. + self.assertNotEqual(file, self.server.file) + self.assertNotEqual(sock, self.server.sock) + # Check that the new socket really is an SSL one + self.assertIsInstance(self.server.sock, ssl.SSLSocket) + # Check that trying starttls when it's already active fails. + self.assertRaises(ValueError, self.server.starttls) def test_zlogin(self): # This test must be the penultimate because further commands will be @@ -300,25 +302,24 @@ class NetworkedNNTPTests(NetworkedNNTPTestsMixin, unittest.TestCase): if cls.server is not None: cls.server.quit() +@unittest.skipUnless(ssl, 'requires SSL support') +class NetworkedNNTP_SSLTests(NetworkedNNTPTests): -if _have_ssl: - class NetworkedNNTP_SSLTests(NetworkedNNTPTests): - - # Technical limits for this public NNTP server (see http://www.aioe.org): - # "Only two concurrent connections per IP address are allowed and - # 400 connections per day are accepted from each IP address." + # Technical limits for this public NNTP server (see http://www.aioe.org): + # "Only two concurrent connections per IP address are allowed and + # 400 connections per day are accepted from each IP address." - NNTP_HOST = 'nntp.aioe.org' - GROUP_NAME = 'comp.lang.python' - GROUP_PAT = 'comp.lang.*' + NNTP_HOST = 'nntp.aioe.org' + GROUP_NAME = 'comp.lang.python' + GROUP_PAT = 'comp.lang.*' - NNTP_CLASS = nntplib.NNTP_SSL + NNTP_CLASS = getattr(nntplib, 'NNTP_SSL', None) - # Disabled as it produces too much data - test_list = None + # Disabled as it produces too much data + test_list = None - # Disabled as the connection will already be encrypted. - test_starttls = None + # Disabled as the connection will already be encrypted. + test_starttls = None # @@ -1407,12 +1408,13 @@ class MiscTests(unittest.TestCase): gives(2000, 6, 23, "000623", "000000") gives(2010, 6, 5, "100605", "000000") + @unittest.skipUnless(ssl, 'requires SSL support') + def test_ssl_support(self): + self.assertTrue(hasattr(nntplib, 'NNTP_SSL')) def test_main(): tests = [MiscTests, NNTPv1Tests, NNTPv2Tests, CapsAfterLoginNNTPv2Tests, - SendReaderNNTPv2Tests, NetworkedNNTPTests] - if _have_ssl: - tests.append(NetworkedNNTP_SSLTests) + SendReaderNNTPv2Tests, NetworkedNNTPTests, NetworkedNNTP_SSLTests] support.run_unittest(*tests) diff --git a/Lib/test/test_os.py b/Lib/test/test_os.py index 81a4d9a..1ffa7da 100644 --- a/Lib/test/test_os.py +++ b/Lib/test/test_os.py @@ -185,10 +185,8 @@ class StatAttributeTests(unittest.TestCase): os.unlink(self.fname) os.rmdir(support.TESTFN) + @unittest.skipUnless(hasattr(os, 'stat'), 'test needs os.stat()') def check_stat_attributes(self, fname): - if not hasattr(os, "stat"): - return - result = os.stat(fname) # Make sure direct access works @@ -272,10 +270,8 @@ class StatAttributeTests(unittest.TestCase): unpickled = pickle.loads(p) self.assertEqual(result, unpickled) + @unittest.skipUnless(hasattr(os, 'statvfs'), 'test needs os.statvfs()') def test_statvfs_attributes(self): - if not hasattr(os, "statvfs"): - return - try: result = os.statvfs(self.fname) except OSError as e: @@ -479,10 +475,10 @@ class StatAttributeTests(unittest.TestCase): os.close(dirfd) self._test_utime_subsecond(set_time) - # Restrict test to Win32, since there is no guarantee other + # Restrict tests to Win32, since there is no guarantee other # systems support centiseconds - if sys.platform == 'win32': - def get_file_system(path): + def get_file_system(path): + if sys.platform == 'win32': root = os.path.splitdrive(os.path.abspath(path))[0] + '\\' import ctypes kernel32 = ctypes.windll.kernel32 @@ -490,38 +486,45 @@ class StatAttributeTests(unittest.TestCase): if kernel32.GetVolumeInformationW(root, None, 0, None, None, None, buf, len(buf)): return buf.value - if get_file_system(support.TESTFN) == "NTFS": - def test_1565150(self): - t1 = 1159195039.25 - os.utime(self.fname, (t1, t1)) - self.assertEqual(os.stat(self.fname).st_mtime, t1) - - def test_large_time(self): - t1 = 5000000000 # some day in 2128 - os.utime(self.fname, (t1, t1)) - self.assertEqual(os.stat(self.fname).st_mtime, t1) - - def test_1686475(self): - # Verify that an open file can be stat'ed - try: - os.stat(r"c:\pagefile.sys") - except FileNotFoundError: - pass # file does not exist; cannot run test - except OSError as e: - self.fail("Could not stat pagefile.sys") + @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests") + @unittest.skipUnless(get_file_system(support.TESTFN) == "NTFS", + "requires NTFS") + def test_1565150(self): + t1 = 1159195039.25 + os.utime(self.fname, (t1, t1)) + self.assertEqual(os.stat(self.fname).st_mtime, t1) + + @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests") + @unittest.skipUnless(get_file_system(support.TESTFN) == "NTFS", + "requires NTFS") + def test_large_time(self): + t1 = 5000000000 # some day in 2128 + os.utime(self.fname, (t1, t1)) + self.assertEqual(os.stat(self.fname).st_mtime, t1) + + @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests") + def test_1686475(self): + # Verify that an open file can be stat'ed + try: + os.stat(r"c:\pagefile.sys") + except FileNotFoundError: + pass # file does not exist; cannot run test + except OSError as e: + self.fail("Could not stat pagefile.sys") - @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()") - def test_15261(self): - # Verify that stat'ing a closed fd does not cause crash - r, w = os.pipe() - try: - os.stat(r) # should not raise error - finally: - os.close(r) - os.close(w) - with self.assertRaises(OSError) as ctx: - os.stat(r) - self.assertEqual(ctx.exception.errno, errno.EBADF) + @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests") + @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()") + def test_15261(self): + # Verify that stat'ing a closed fd does not cause crash + r, w = os.pipe() + try: + os.stat(r) # should not raise error + finally: + os.close(r) + os.close(w) + with self.assertRaises(OSError) as ctx: + os.stat(r) + self.assertEqual(ctx.exception.errno, errno.EBADF) from test import mapping_tests @@ -1167,6 +1170,7 @@ class ExecTests(unittest.TestCase): self._test_internal_execvpe(bytes) +@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests") class Win32ErrorTests(unittest.TestCase): def test_rename(self): self.assertRaises(OSError, os.rename, support.TESTFN, support.TESTFN+".bak") @@ -1213,63 +1217,63 @@ class TestInvalidFD(unittest.TestCase): self.fail("%r didn't raise a OSError with a bad file descriptor" % f) + @unittest.skipUnless(hasattr(os, 'isatty'), 'test needs os.isatty()') def test_isatty(self): - if hasattr(os, "isatty"): - self.assertEqual(os.isatty(support.make_bad_fd()), False) + self.assertEqual(os.isatty(support.make_bad_fd()), False) + @unittest.skipUnless(hasattr(os, 'closerange'), 'test needs os.closerange()') def test_closerange(self): - if hasattr(os, "closerange"): - fd = support.make_bad_fd() - # Make sure none of the descriptors we are about to close are - # currently valid (issue 6542). - for i in range(10): - try: os.fstat(fd+i) - except OSError: - pass - else: - break - if i < 2: - raise unittest.SkipTest( - "Unable to acquire a range of invalid file descriptors") - self.assertEqual(os.closerange(fd, fd + i-1), None) + fd = support.make_bad_fd() + # Make sure none of the descriptors we are about to close are + # currently valid (issue 6542). + for i in range(10): + try: os.fstat(fd+i) + except OSError: + pass + else: + break + if i < 2: + raise unittest.SkipTest( + "Unable to acquire a range of invalid file descriptors") + self.assertEqual(os.closerange(fd, fd + i-1), None) + @unittest.skipUnless(hasattr(os, 'dup2'), 'test needs os.dup2()') def test_dup2(self): - if hasattr(os, "dup2"): - self.check(os.dup2, 20) + self.check(os.dup2, 20) + @unittest.skipUnless(hasattr(os, 'fchmod'), 'test needs os.fchmod()') def test_fchmod(self): - if hasattr(os, "fchmod"): - self.check(os.fchmod, 0) + self.check(os.fchmod, 0) + @unittest.skipUnless(hasattr(os, 'fchown'), 'test needs os.fchown()') def test_fchown(self): - if hasattr(os, "fchown"): - self.check(os.fchown, -1, -1) + self.check(os.fchown, -1, -1) + @unittest.skipUnless(hasattr(os, 'fpathconf'), 'test needs os.fpathconf()') def test_fpathconf(self): - if hasattr(os, "fpathconf"): - self.check(os.pathconf, "PC_NAME_MAX") - self.check(os.fpathconf, "PC_NAME_MAX") + self.check(os.pathconf, "PC_NAME_MAX") + self.check(os.fpathconf, "PC_NAME_MAX") + @unittest.skipUnless(hasattr(os, 'ftruncate'), 'test needs os.ftruncate()') def test_ftruncate(self): - if hasattr(os, "ftruncate"): - self.check(os.truncate, 0) - self.check(os.ftruncate, 0) + self.check(os.truncate, 0) + self.check(os.ftruncate, 0) + @unittest.skipUnless(hasattr(os, 'lseek'), 'test needs os.lseek()') def test_lseek(self): - if hasattr(os, "lseek"): - self.check(os.lseek, 0, 0) + self.check(os.lseek, 0, 0) + @unittest.skipUnless(hasattr(os, 'read'), 'test needs os.read()') def test_read(self): - if hasattr(os, "read"): - self.check(os.read, 1) + self.check(os.read, 1) + @unittest.skipUnless(hasattr(os, 'tcsetpgrp'), 'test needs os.tcsetpgrp()') def test_tcsetpgrpt(self): - if hasattr(os, "tcsetpgrp"): - self.check(os.tcsetpgrp, 0) + self.check(os.tcsetpgrp, 0) + @unittest.skipUnless(hasattr(os, 'write'), 'test needs os.write()') def test_write(self): - if hasattr(os, "write"): - self.check(os.write, b" ") + self.check(os.write, b" ") class LinkTests(unittest.TestCase): @@ -1309,138 +1313,133 @@ class LinkTests(unittest.TestCase): self.file2 = self.file1 + "2" self._test_link(self.file1, self.file2) -if sys.platform != 'win32': - class Win32ErrorTests(unittest.TestCase): - pass - - class PosixUidGidTests(unittest.TestCase): - if hasattr(os, 'setuid'): - def test_setuid(self): - if os.getuid() != 0: - self.assertRaises(OSError, os.setuid, 0) - self.assertRaises(OverflowError, os.setuid, 1<<32) - - if hasattr(os, 'setgid'): - def test_setgid(self): - if os.getuid() != 0 and not HAVE_WHEEL_GROUP: - self.assertRaises(OSError, os.setgid, 0) - self.assertRaises(OverflowError, os.setgid, 1<<32) - - if hasattr(os, 'seteuid'): - def test_seteuid(self): - if os.getuid() != 0: - self.assertRaises(OSError, os.seteuid, 0) - self.assertRaises(OverflowError, os.seteuid, 1<<32) - - if hasattr(os, 'setegid'): - def test_setegid(self): - if os.getuid() != 0 and not HAVE_WHEEL_GROUP: - self.assertRaises(OSError, os.setegid, 0) - self.assertRaises(OverflowError, os.setegid, 1<<32) - - if hasattr(os, 'setreuid'): - def test_setreuid(self): - if os.getuid() != 0: - self.assertRaises(OSError, os.setreuid, 0, 0) - self.assertRaises(OverflowError, os.setreuid, 1<<32, 0) - self.assertRaises(OverflowError, os.setreuid, 0, 1<<32) - - def test_setreuid_neg1(self): - # Needs to accept -1. We run this in a subprocess to avoid - # altering the test runner's process state (issue8045). - subprocess.check_call([ - sys.executable, '-c', - 'import os,sys;os.setreuid(-1,-1);sys.exit(0)']) - - if hasattr(os, 'setregid'): - def test_setregid(self): - if os.getuid() != 0 and not HAVE_WHEEL_GROUP: - self.assertRaises(OSError, os.setregid, 0, 0) - self.assertRaises(OverflowError, os.setregid, 1<<32, 0) - self.assertRaises(OverflowError, os.setregid, 0, 1<<32) - - def test_setregid_neg1(self): - # Needs to accept -1. We run this in a subprocess to avoid - # altering the test runner's process state (issue8045). - subprocess.check_call([ - sys.executable, '-c', - 'import os,sys;os.setregid(-1,-1);sys.exit(0)']) - - class Pep383Tests(unittest.TestCase): - def setUp(self): - if support.TESTFN_UNENCODABLE: - self.dir = support.TESTFN_UNENCODABLE - elif support.TESTFN_NONASCII: - self.dir = support.TESTFN_NONASCII - else: - self.dir = support.TESTFN - self.bdir = os.fsencode(self.dir) +@unittest.skipIf(sys.platform == "win32", "Posix specific tests") +class PosixUidGidTests(unittest.TestCase): + @unittest.skipUnless(hasattr(os, 'setuid'), 'test needs os.setuid()') + def test_setuid(self): + if os.getuid() != 0: + self.assertRaises(OSError, os.setuid, 0) + self.assertRaises(OverflowError, os.setuid, 1<<32) + + @unittest.skipUnless(hasattr(os, 'setgid'), 'test needs os.setgid()') + def test_setgid(self): + if os.getuid() != 0 and not HAVE_WHEEL_GROUP: + self.assertRaises(OSError, os.setgid, 0) + self.assertRaises(OverflowError, os.setgid, 1<<32) + + @unittest.skipUnless(hasattr(os, 'seteuid'), 'test needs os.seteuid()') + def test_seteuid(self): + if os.getuid() != 0: + self.assertRaises(OSError, os.seteuid, 0) + self.assertRaises(OverflowError, os.seteuid, 1<<32) + + @unittest.skipUnless(hasattr(os, 'setegid'), 'test needs os.setegid()') + def test_setegid(self): + if os.getuid() != 0 and not HAVE_WHEEL_GROUP: + self.assertRaises(OSError, os.setegid, 0) + self.assertRaises(OverflowError, os.setegid, 1<<32) + + @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()') + def test_setreuid(self): + if os.getuid() != 0: + self.assertRaises(OSError, os.setreuid, 0, 0) + self.assertRaises(OverflowError, os.setreuid, 1<<32, 0) + self.assertRaises(OverflowError, os.setreuid, 0, 1<<32) + + @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()') + def test_setreuid_neg1(self): + # Needs to accept -1. We run this in a subprocess to avoid + # altering the test runner's process state (issue8045). + subprocess.check_call([ + sys.executable, '-c', + 'import os,sys;os.setreuid(-1,-1);sys.exit(0)']) + + @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()') + def test_setregid(self): + if os.getuid() != 0 and not HAVE_WHEEL_GROUP: + self.assertRaises(OSError, os.setregid, 0, 0) + self.assertRaises(OverflowError, os.setregid, 1<<32, 0) + self.assertRaises(OverflowError, os.setregid, 0, 1<<32) + + @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()') + def test_setregid_neg1(self): + # Needs to accept -1. We run this in a subprocess to avoid + # altering the test runner's process state (issue8045). + subprocess.check_call([ + sys.executable, '-c', + 'import os,sys;os.setregid(-1,-1);sys.exit(0)']) + +@unittest.skipIf(sys.platform == "win32", "Posix specific tests") +class Pep383Tests(unittest.TestCase): + def setUp(self): + if support.TESTFN_UNENCODABLE: + self.dir = support.TESTFN_UNENCODABLE + elif support.TESTFN_NONASCII: + self.dir = support.TESTFN_NONASCII + else: + self.dir = support.TESTFN + self.bdir = os.fsencode(self.dir) - bytesfn = [] - def add_filename(fn): - try: - fn = os.fsencode(fn) - except UnicodeEncodeError: - return - bytesfn.append(fn) - add_filename(support.TESTFN_UNICODE) - if support.TESTFN_UNENCODABLE: - add_filename(support.TESTFN_UNENCODABLE) - if support.TESTFN_NONASCII: - add_filename(support.TESTFN_NONASCII) - if not bytesfn: - self.skipTest("couldn't create any non-ascii filename") - - self.unicodefn = set() - os.mkdir(self.dir) + bytesfn = [] + def add_filename(fn): try: - for fn in bytesfn: - support.create_empty_file(os.path.join(self.bdir, fn)) - fn = os.fsdecode(fn) - if fn in self.unicodefn: - raise ValueError("duplicate filename") - self.unicodefn.add(fn) - except: - shutil.rmtree(self.dir) - raise - - def tearDown(self): + fn = os.fsencode(fn) + except UnicodeEncodeError: + return + bytesfn.append(fn) + add_filename(support.TESTFN_UNICODE) + if support.TESTFN_UNENCODABLE: + add_filename(support.TESTFN_UNENCODABLE) + if support.TESTFN_NONASCII: + add_filename(support.TESTFN_NONASCII) + if not bytesfn: + self.skipTest("couldn't create any non-ascii filename") + + self.unicodefn = set() + os.mkdir(self.dir) + try: + for fn in bytesfn: + support.create_empty_file(os.path.join(self.bdir, fn)) + fn = os.fsdecode(fn) + if fn in self.unicodefn: + raise ValueError("duplicate filename") + self.unicodefn.add(fn) + except: shutil.rmtree(self.dir) + raise - def test_listdir(self): - expected = self.unicodefn - found = set(os.listdir(self.dir)) - self.assertEqual(found, expected) - # test listdir without arguments - current_directory = os.getcwd() - try: - os.chdir(os.sep) - self.assertEqual(set(os.listdir()), set(os.listdir(os.sep))) - finally: - os.chdir(current_directory) - - def test_open(self): - for fn in self.unicodefn: - f = open(os.path.join(self.dir, fn), 'rb') - f.close() - - @unittest.skipUnless(hasattr(os, 'statvfs'), - "need os.statvfs()") - def test_statvfs(self): - # issue #9645 - for fn in self.unicodefn: - # should not fail with file not found error - fullname = os.path.join(self.dir, fn) - os.statvfs(fullname) - - def test_stat(self): - for fn in self.unicodefn: - os.stat(os.path.join(self.dir, fn)) -else: - class PosixUidGidTests(unittest.TestCase): - pass - class Pep383Tests(unittest.TestCase): - pass + def tearDown(self): + shutil.rmtree(self.dir) + + def test_listdir(self): + expected = self.unicodefn + found = set(os.listdir(self.dir)) + self.assertEqual(found, expected) + # test listdir without arguments + current_directory = os.getcwd() + try: + os.chdir(os.sep) + self.assertEqual(set(os.listdir()), set(os.listdir(os.sep))) + finally: + os.chdir(current_directory) + + def test_open(self): + for fn in self.unicodefn: + f = open(os.path.join(self.dir, fn), 'rb') + f.close() + + @unittest.skipUnless(hasattr(os, 'statvfs'), + "need os.statvfs()") + def test_statvfs(self): + # issue #9645 + for fn in self.unicodefn: + # should not fail with file not found error + fullname = os.path.join(self.dir, fn) + os.statvfs(fullname) + + def test_stat(self): + for fn in self.unicodefn: + os.stat(os.path.join(self.dir, fn)) @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests") class Win32KillTests(unittest.TestCase): @@ -1924,6 +1923,8 @@ class TestSendfile(unittest.TestCase): SUPPORT_HEADERS_TRAILERS = not sys.platform.startswith("linux") and \ not sys.platform.startswith("solaris") and \ not sys.platform.startswith("sunos") + requires_headers_trailers = unittest.skipUnless(SUPPORT_HEADERS_TRAILERS, + 'requires headers and trailers support') @classmethod def setUpClass(cls): @@ -2042,52 +2043,54 @@ class TestSendfile(unittest.TestCase): # --- headers / trailers tests - if SUPPORT_HEADERS_TRAILERS: - - def test_headers(self): - total_sent = 0 - sent = os.sendfile(self.sockno, self.fileno, 0, 4096, - headers=[b"x" * 512]) + @requires_headers_trailers + def test_headers(self): + total_sent = 0 + sent = os.sendfile(self.sockno, self.fileno, 0, 4096, + headers=[b"x" * 512]) + total_sent += sent + offset = 4096 + nbytes = 4096 + while 1: + sent = self.sendfile_wrapper(self.sockno, self.fileno, + offset, nbytes) + if sent == 0: + break total_sent += sent - offset = 4096 - nbytes = 4096 - while 1: - sent = self.sendfile_wrapper(self.sockno, self.fileno, - offset, nbytes) - if sent == 0: - break - total_sent += sent - offset += sent + offset += sent - expected_data = b"x" * 512 + self.DATA - self.assertEqual(total_sent, len(expected_data)) + expected_data = b"x" * 512 + self.DATA + self.assertEqual(total_sent, len(expected_data)) + self.client.close() + self.server.wait() + data = self.server.handler_instance.get_data() + self.assertEqual(hash(data), hash(expected_data)) + + @requires_headers_trailers + def test_trailers(self): + TESTFN2 = support.TESTFN + "2" + file_data = b"abcdef" + with open(TESTFN2, 'wb') as f: + f.write(file_data) + with open(TESTFN2, 'rb')as f: + self.addCleanup(os.remove, TESTFN2) + os.sendfile(self.sockno, f.fileno(), 0, len(file_data), + trailers=[b"1234"]) self.client.close() self.server.wait() data = self.server.handler_instance.get_data() - self.assertEqual(hash(data), hash(expected_data)) - - def test_trailers(self): - TESTFN2 = support.TESTFN + "2" - file_data = b"abcdef" - with open(TESTFN2, 'wb') as f: - f.write(file_data) - with open(TESTFN2, 'rb')as f: - self.addCleanup(os.remove, TESTFN2) - os.sendfile(self.sockno, f.fileno(), 0, len(file_data), - trailers=[b"1234"]) - self.client.close() - self.server.wait() - data = self.server.handler_instance.get_data() - self.assertEqual(data, b"abcdef1234") - - if hasattr(os, "SF_NODISKIO"): - def test_flags(self): - try: - os.sendfile(self.sockno, self.fileno, 0, 4096, - flags=os.SF_NODISKIO) - except OSError as err: - if err.errno not in (errno.EBUSY, errno.EAGAIN): - raise + self.assertEqual(data, b"abcdef1234") + + @requires_headers_trailers + @unittest.skipUnless(hasattr(os, 'SF_NODISKIO'), + 'test needs os.SF_NODISKIO') + def test_flags(self): + try: + os.sendfile(self.sockno, self.fileno, 0, 4096, + flags=os.SF_NODISKIO) + except OSError as err: + if err.errno not in (errno.EBUSY, errno.EAGAIN): + raise def supports_extended_attributes(): diff --git a/Lib/test/test_poplib.py b/Lib/test/test_poplib.py index dd51ac9..70fe426 100644 --- a/Lib/test/test_poplib.py +++ b/Lib/test/test_poplib.py @@ -11,7 +11,7 @@ import os import time import errno -from unittest import TestCase +from unittest import TestCase, skipUnless from test import support as test_support threading = test_support.import_module('threading') @@ -24,6 +24,7 @@ if hasattr(poplib, 'POP3_SSL'): SUPPORTS_SSL = True CERTFILE = os.path.join(os.path.dirname(__file__) or os.curdir, "keycert.pem") +requires_ssl = skipUnless(SUPPORTS_SSL, 'SSL not supported') # the dummy data returned by server when LIST and RETR commands are issued LIST_RESP = b'1 1\r\n2 2\r\n3 3\r\n4 4\r\n5 5\r\n.\r\n' @@ -316,22 +317,23 @@ class TestPOP3Class(TestCase): self.assertIsNone(self.client.sock) self.assertIsNone(self.client.file) - if SUPPORTS_SSL: - - def test_stls_capa(self): - capa = self.client.capa() - self.assertTrue('STLS' in capa.keys()) + @requires_ssl + def test_stls_capa(self): + capa = self.client.capa() + self.assertTrue('STLS' in capa.keys()) - def test_stls(self): - expected = b'+OK Begin TLS negotiation' - resp = self.client.stls() - self.assertEqual(resp, expected) + @requires_ssl + def test_stls(self): + expected = b'+OK Begin TLS negotiation' + resp = self.client.stls() + self.assertEqual(resp, expected) - def test_stls_context(self): - expected = b'+OK Begin TLS negotiation' - ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) - resp = self.client.stls(context=ctx) - self.assertEqual(resp, expected) + @requires_ssl + def test_stls_context(self): + expected = b'+OK Begin TLS negotiation' + ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) + resp = self.client.stls(context=ctx) + self.assertEqual(resp, expected) if SUPPORTS_SSL: @@ -354,73 +356,75 @@ if SUPPORTS_SSL: self.push('+OK dummy pop3 server ready. <timestamp>') - class TestPOP3_SSLClass(TestPOP3Class): - # repeat previous tests by using poplib.POP3_SSL - - def setUp(self): - self.server = DummyPOP3Server((HOST, PORT)) - self.server.handler = DummyPOP3_SSLHandler - self.server.start() - self.client = poplib.POP3_SSL(self.server.host, self.server.port) - - def test__all__(self): - self.assertIn('POP3_SSL', poplib.__all__) - - def test_context(self): - ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) - self.assertRaises(ValueError, poplib.POP3_SSL, self.server.host, - self.server.port, keyfile=CERTFILE, context=ctx) - self.assertRaises(ValueError, poplib.POP3_SSL, self.server.host, - self.server.port, certfile=CERTFILE, context=ctx) - self.assertRaises(ValueError, poplib.POP3_SSL, self.server.host, - self.server.port, keyfile=CERTFILE, - certfile=CERTFILE, context=ctx) +@requires_ssl +class TestPOP3_SSLClass(TestPOP3Class): + # repeat previous tests by using poplib.POP3_SSL - self.client.quit() - self.client = poplib.POP3_SSL(self.server.host, self.server.port, - context=ctx) - self.assertIsInstance(self.client.sock, ssl.SSLSocket) - self.assertIs(self.client.sock.context, ctx) - self.assertTrue(self.client.noop().startswith(b'+OK')) - - def test_stls(self): - self.assertRaises(poplib.error_proto, self.client.stls) - - test_stls_context = test_stls - - def test_stls_capa(self): - capa = self.client.capa() - self.assertFalse('STLS' in capa.keys()) + def setUp(self): + self.server = DummyPOP3Server((HOST, PORT)) + self.server.handler = DummyPOP3_SSLHandler + self.server.start() + self.client = poplib.POP3_SSL(self.server.host, self.server.port) + + def test__all__(self): + self.assertIn('POP3_SSL', poplib.__all__) + + def test_context(self): + ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) + self.assertRaises(ValueError, poplib.POP3_SSL, self.server.host, + self.server.port, keyfile=CERTFILE, context=ctx) + self.assertRaises(ValueError, poplib.POP3_SSL, self.server.host, + self.server.port, certfile=CERTFILE, context=ctx) + self.assertRaises(ValueError, poplib.POP3_SSL, self.server.host, + self.server.port, keyfile=CERTFILE, + certfile=CERTFILE, context=ctx) + + self.client.quit() + self.client = poplib.POP3_SSL(self.server.host, self.server.port, + context=ctx) + self.assertIsInstance(self.client.sock, ssl.SSLSocket) + self.assertIs(self.client.sock.context, ctx) + self.assertTrue(self.client.noop().startswith(b'+OK')) + + def test_stls(self): + self.assertRaises(poplib.error_proto, self.client.stls) + + test_stls_context = test_stls + + def test_stls_capa(self): + capa = self.client.capa() + self.assertFalse('STLS' in capa.keys()) - class TestPOP3_TLSClass(TestPOP3Class): - # repeat previous tests by using poplib.POP3.stls() +@requires_ssl +class TestPOP3_TLSClass(TestPOP3Class): + # repeat previous tests by using poplib.POP3.stls() - def setUp(self): - self.server = DummyPOP3Server((HOST, PORT)) - self.server.start() - self.client = poplib.POP3(self.server.host, self.server.port, timeout=3) - self.client.stls() + def setUp(self): + self.server = DummyPOP3Server((HOST, PORT)) + self.server.start() + self.client = poplib.POP3(self.server.host, self.server.port, timeout=3) + self.client.stls() - def tearDown(self): - if self.client.file is not None and self.client.sock is not None: - try: - self.client.quit() - except poplib.error_proto: - # happens in the test_too_long_lines case; the overlong - # response will be treated as response to QUIT and raise - # this exception - pass - self.server.stop() + def tearDown(self): + if self.client.file is not None and self.client.sock is not None: + try: + self.client.quit() + except poplib.error_proto: + # happens in the test_too_long_lines case; the overlong + # response will be treated as response to QUIT and raise + # this exception + pass + self.server.stop() - def test_stls(self): - self.assertRaises(poplib.error_proto, self.client.stls) + def test_stls(self): + self.assertRaises(poplib.error_proto, self.client.stls) - test_stls_context = test_stls + test_stls_context = test_stls - def test_stls_capa(self): - capa = self.client.capa() - self.assertFalse(b'STLS' in capa.keys()) + def test_stls_capa(self): + capa = self.client.capa() + self.assertFalse(b'STLS' in capa.keys()) class TestTimeouts(TestCase): @@ -478,10 +482,8 @@ class TestTimeouts(TestCase): def test_main(): - tests = [TestPOP3Class, TestTimeouts] - if SUPPORTS_SSL: - tests.append(TestPOP3_SSLClass) - tests.append(TestPOP3_TLSClass) + tests = [TestPOP3Class, TestTimeouts, + TestPOP3_SSLClass, TestPOP3_TLSClass] thread_info = test_support.threading_setup() try: test_support.run_unittest(*tests) diff --git a/Lib/test/test_posix.py b/Lib/test/test_posix.py index 6cd3393..1eceebe 100644 --- a/Lib/test/test_posix.py +++ b/Lib/test/test_posix.py @@ -54,47 +54,55 @@ class PosixTester(unittest.TestCase): posix_func() self.assertRaises(TypeError, posix_func, 1) - if hasattr(posix, 'getresuid'): - def test_getresuid(self): - user_ids = posix.getresuid() - self.assertEqual(len(user_ids), 3) - for val in user_ids: - self.assertGreaterEqual(val, 0) - - if hasattr(posix, 'getresgid'): - def test_getresgid(self): - group_ids = posix.getresgid() - self.assertEqual(len(group_ids), 3) - for val in group_ids: - self.assertGreaterEqual(val, 0) - - if hasattr(posix, 'setresuid'): - def test_setresuid(self): - current_user_ids = posix.getresuid() - self.assertIsNone(posix.setresuid(*current_user_ids)) - # -1 means don't change that value. - self.assertIsNone(posix.setresuid(-1, -1, -1)) - - def test_setresuid_exception(self): - # Don't do this test if someone is silly enough to run us as root. - current_user_ids = posix.getresuid() - if 0 not in current_user_ids: - new_user_ids = (current_user_ids[0]+1, -1, -1) - self.assertRaises(OSError, posix.setresuid, *new_user_ids) - - if hasattr(posix, 'setresgid'): - def test_setresgid(self): - current_group_ids = posix.getresgid() - self.assertIsNone(posix.setresgid(*current_group_ids)) - # -1 means don't change that value. - self.assertIsNone(posix.setresgid(-1, -1, -1)) - - def test_setresgid_exception(self): - # Don't do this test if someone is silly enough to run us as root. - current_group_ids = posix.getresgid() - if 0 not in current_group_ids: - new_group_ids = (current_group_ids[0]+1, -1, -1) - self.assertRaises(OSError, posix.setresgid, *new_group_ids) + @unittest.skipUnless(hasattr(posix, 'getresuid'), + 'test needs posix.getresuid()') + def test_getresuid(self): + user_ids = posix.getresuid() + self.assertEqual(len(user_ids), 3) + for val in user_ids: + self.assertGreaterEqual(val, 0) + + @unittest.skipUnless(hasattr(posix, 'getresgid'), + 'test needs posix.getresgid()') + def test_getresgid(self): + group_ids = posix.getresgid() + self.assertEqual(len(group_ids), 3) + for val in group_ids: + self.assertGreaterEqual(val, 0) + + @unittest.skipUnless(hasattr(posix, 'setresuid'), + 'test needs posix.setresuid()') + def test_setresuid(self): + current_user_ids = posix.getresuid() + self.assertIsNone(posix.setresuid(*current_user_ids)) + # -1 means don't change that value. + self.assertIsNone(posix.setresuid(-1, -1, -1)) + + @unittest.skipUnless(hasattr(posix, 'setresuid'), + 'test needs posix.setresuid()') + def test_setresuid_exception(self): + # Don't do this test if someone is silly enough to run us as root. + current_user_ids = posix.getresuid() + if 0 not in current_user_ids: + new_user_ids = (current_user_ids[0]+1, -1, -1) + self.assertRaises(OSError, posix.setresuid, *new_user_ids) + + @unittest.skipUnless(hasattr(posix, 'setresgid'), + 'test needs posix.setresgid()') + def test_setresgid(self): + current_group_ids = posix.getresgid() + self.assertIsNone(posix.setresgid(*current_group_ids)) + # -1 means don't change that value. + self.assertIsNone(posix.setresgid(-1, -1, -1)) + + @unittest.skipUnless(hasattr(posix, 'setresgid'), + 'test needs posix.setresgid()') + def test_setresgid_exception(self): + # Don't do this test if someone is silly enough to run us as root. + current_group_ids = posix.getresgid() + if 0 not in current_group_ids: + new_group_ids = (current_group_ids[0]+1, -1, -1) + self.assertRaises(OSError, posix.setresgid, *new_group_ids) @unittest.skipUnless(hasattr(posix, 'initgroups'), "test needs os.initgroups()") @@ -121,29 +129,32 @@ class PosixTester(unittest.TestCase): else: self.fail("Expected OSError to be raised by initgroups") + @unittest.skipUnless(hasattr(posix, 'statvfs'), + 'test needs posix.statvfs()') def test_statvfs(self): - if hasattr(posix, 'statvfs'): - self.assertTrue(posix.statvfs(os.curdir)) + self.assertTrue(posix.statvfs(os.curdir)) + @unittest.skipUnless(hasattr(posix, 'fstatvfs'), + 'test needs posix.fstatvfs()') def test_fstatvfs(self): - if hasattr(posix, 'fstatvfs'): - fp = open(support.TESTFN) - try: - self.assertTrue(posix.fstatvfs(fp.fileno())) - self.assertTrue(posix.statvfs(fp.fileno())) - finally: - fp.close() + fp = open(support.TESTFN) + try: + self.assertTrue(posix.fstatvfs(fp.fileno())) + self.assertTrue(posix.statvfs(fp.fileno())) + finally: + fp.close() + @unittest.skipUnless(hasattr(posix, 'ftruncate'), + 'test needs posix.ftruncate()') def test_ftruncate(self): - if hasattr(posix, 'ftruncate'): - fp = open(support.TESTFN, 'w+') - try: - # we need to have some data to truncate - fp.write('test') - fp.flush() - posix.ftruncate(fp.fileno(), 0) - finally: - fp.close() + fp = open(support.TESTFN, 'w+') + try: + # we need to have some data to truncate + fp.write('test') + fp.flush() + posix.ftruncate(fp.fileno(), 0) + finally: + fp.close() @unittest.skipUnless(hasattr(posix, 'truncate'), "test needs posix.truncate()") def test_truncate(self): @@ -290,30 +301,33 @@ class PosixTester(unittest.TestCase): finally: os.close(fd) + @unittest.skipUnless(hasattr(posix, 'dup'), + 'test needs posix.dup()') def test_dup(self): - if hasattr(posix, 'dup'): - fp = open(support.TESTFN) - try: - fd = posix.dup(fp.fileno()) - self.assertIsInstance(fd, int) - os.close(fd) - finally: - fp.close() + fp = open(support.TESTFN) + try: + fd = posix.dup(fp.fileno()) + self.assertIsInstance(fd, int) + os.close(fd) + finally: + fp.close() + @unittest.skipUnless(hasattr(posix, 'confstr'), + 'test needs posix.confstr()') def test_confstr(self): - if hasattr(posix, 'confstr'): - self.assertRaises(ValueError, posix.confstr, "CS_garbage") - self.assertEqual(len(posix.confstr("CS_PATH")) > 0, True) + self.assertRaises(ValueError, posix.confstr, "CS_garbage") + self.assertEqual(len(posix.confstr("CS_PATH")) > 0, True) + @unittest.skipUnless(hasattr(posix, 'dup2'), + 'test needs posix.dup2()') def test_dup2(self): - if hasattr(posix, 'dup2'): - fp1 = open(support.TESTFN) - fp2 = open(support.TESTFN) - try: - posix.dup2(fp1.fileno(), fp2.fileno()) - finally: - fp1.close() - fp2.close() + fp1 = open(support.TESTFN) + fp2 = open(support.TESTFN) + try: + posix.dup2(fp1.fileno(), fp2.fileno()) + finally: + fp1.close() + fp2.close() @unittest.skipUnless(hasattr(os, 'O_CLOEXEC'), "needs os.O_CLOEXEC") @support.requires_linux_version(2, 6, 23) @@ -322,65 +336,69 @@ class PosixTester(unittest.TestCase): self.addCleanup(os.close, fd) self.assertTrue(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC) + @unittest.skipUnless(hasattr(posix, 'O_EXLOCK'), + 'test needs posix.O_EXLOCK') def test_osexlock(self): - if hasattr(posix, "O_EXLOCK"): + fd = os.open(support.TESTFN, + os.O_WRONLY|os.O_EXLOCK|os.O_CREAT) + self.assertRaises(OSError, os.open, support.TESTFN, + os.O_WRONLY|os.O_EXLOCK|os.O_NONBLOCK) + os.close(fd) + + if hasattr(posix, "O_SHLOCK"): fd = os.open(support.TESTFN, - os.O_WRONLY|os.O_EXLOCK|os.O_CREAT) + os.O_WRONLY|os.O_SHLOCK|os.O_CREAT) self.assertRaises(OSError, os.open, support.TESTFN, os.O_WRONLY|os.O_EXLOCK|os.O_NONBLOCK) os.close(fd) - if hasattr(posix, "O_SHLOCK"): - fd = os.open(support.TESTFN, - os.O_WRONLY|os.O_SHLOCK|os.O_CREAT) - self.assertRaises(OSError, os.open, support.TESTFN, - os.O_WRONLY|os.O_EXLOCK|os.O_NONBLOCK) - os.close(fd) - + @unittest.skipUnless(hasattr(posix, 'O_SHLOCK'), + 'test needs posix.O_SHLOCK') def test_osshlock(self): - if hasattr(posix, "O_SHLOCK"): - fd1 = os.open(support.TESTFN, + fd1 = os.open(support.TESTFN, + os.O_WRONLY|os.O_SHLOCK|os.O_CREAT) + fd2 = os.open(support.TESTFN, + os.O_WRONLY|os.O_SHLOCK|os.O_CREAT) + os.close(fd2) + os.close(fd1) + + if hasattr(posix, "O_EXLOCK"): + fd = os.open(support.TESTFN, os.O_WRONLY|os.O_SHLOCK|os.O_CREAT) - fd2 = os.open(support.TESTFN, - os.O_WRONLY|os.O_SHLOCK|os.O_CREAT) - os.close(fd2) - os.close(fd1) - - if hasattr(posix, "O_EXLOCK"): - fd = os.open(support.TESTFN, - os.O_WRONLY|os.O_SHLOCK|os.O_CREAT) - self.assertRaises(OSError, os.open, support.TESTFN, - os.O_RDONLY|os.O_EXLOCK|os.O_NONBLOCK) - os.close(fd) + self.assertRaises(OSError, os.open, support.TESTFN, + os.O_RDONLY|os.O_EXLOCK|os.O_NONBLOCK) + os.close(fd) + @unittest.skipUnless(hasattr(posix, 'fstat'), + 'test needs posix.fstat()') def test_fstat(self): - if hasattr(posix, 'fstat'): - fp = open(support.TESTFN) - try: - self.assertTrue(posix.fstat(fp.fileno())) - self.assertTrue(posix.stat(fp.fileno())) - - self.assertRaisesRegex(TypeError, - 'should be string, bytes or integer, not', - posix.stat, float(fp.fileno())) - finally: - fp.close() - - def test_stat(self): - if hasattr(posix, 'stat'): - self.assertTrue(posix.stat(support.TESTFN)) - self.assertTrue(posix.stat(os.fsencode(support.TESTFN))) - self.assertTrue(posix.stat(bytearray(os.fsencode(support.TESTFN)))) + fp = open(support.TESTFN) + try: + self.assertTrue(posix.fstat(fp.fileno())) + self.assertTrue(posix.stat(fp.fileno())) self.assertRaisesRegex(TypeError, - 'can\'t specify None for path argument', - posix.stat, None) - self.assertRaisesRegex(TypeError, - 'should be string, bytes or integer, not', - posix.stat, list(support.TESTFN)) - self.assertRaisesRegex(TypeError, 'should be string, bytes or integer, not', - posix.stat, list(os.fsencode(support.TESTFN))) + posix.stat, float(fp.fileno())) + finally: + fp.close() + + @unittest.skipUnless(hasattr(posix, 'stat'), + 'test needs posix.stat()') + def test_stat(self): + self.assertTrue(posix.stat(support.TESTFN)) + self.assertTrue(posix.stat(os.fsencode(support.TESTFN))) + self.assertTrue(posix.stat(bytearray(os.fsencode(support.TESTFN)))) + + self.assertRaisesRegex(TypeError, + 'can\'t specify None for path argument', + posix.stat, None) + self.assertRaisesRegex(TypeError, + 'should be string, bytes or integer, not', + posix.stat, list(support.TESTFN)) + self.assertRaisesRegex(TypeError, + 'should be string, bytes or integer, not', + posix.stat, list(os.fsencode(support.TESTFN))) @unittest.skipUnless(hasattr(posix, 'mkfifo'), "don't have mkfifo()") def test_mkfifo(self): @@ -495,10 +513,10 @@ class PosixTester(unittest.TestCase): self._test_all_chown_common(posix.lchown, support.TESTFN, getattr(posix, 'lstat', None)) + @unittest.skipUnless(hasattr(posix, 'chdir'), 'test needs posix.chdir()') def test_chdir(self): - if hasattr(posix, 'chdir'): - posix.chdir(os.curdir) - self.assertRaises(OSError, posix.chdir, support.TESTFN) + posix.chdir(os.curdir) + self.assertRaises(OSError, posix.chdir, support.TESTFN) def test_listdir(self): self.assertTrue(support.TESTFN in posix.listdir(os.curdir)) @@ -528,25 +546,26 @@ class PosixTester(unittest.TestCase): sorted(posix.listdir(f)) ) + @unittest.skipUnless(hasattr(posix, 'access'), 'test needs posix.access()') def test_access(self): - if hasattr(posix, 'access'): - self.assertTrue(posix.access(support.TESTFN, os.R_OK)) + self.assertTrue(posix.access(support.TESTFN, os.R_OK)) + @unittest.skipUnless(hasattr(posix, 'umask'), 'test needs posix.umask()') def test_umask(self): - if hasattr(posix, 'umask'): - old_mask = posix.umask(0) - self.assertIsInstance(old_mask, int) - posix.umask(old_mask) + old_mask = posix.umask(0) + self.assertIsInstance(old_mask, int) + posix.umask(old_mask) + @unittest.skipUnless(hasattr(posix, 'strerror'), + 'test needs posix.strerror()') def test_strerror(self): - if hasattr(posix, 'strerror'): - self.assertTrue(posix.strerror(0)) + self.assertTrue(posix.strerror(0)) + @unittest.skipUnless(hasattr(posix, 'pipe'), 'test needs posix.pipe()') def test_pipe(self): - if hasattr(posix, 'pipe'): - reader, writer = posix.pipe() - os.close(reader) - os.close(writer) + reader, writer = posix.pipe() + os.close(reader) + os.close(writer) @unittest.skipUnless(hasattr(os, 'pipe2'), "test needs os.pipe2()") @support.requires_linux_version(2, 6, 27) @@ -580,15 +599,15 @@ class PosixTester(unittest.TestCase): self.assertRaises(OverflowError, os.pipe2, _testcapi.INT_MAX + 1) self.assertRaises(OverflowError, os.pipe2, _testcapi.UINT_MAX + 1) + @unittest.skipUnless(hasattr(posix, 'utime'), 'test needs posix.utime()') def test_utime(self): - if hasattr(posix, 'utime'): - now = time.time() - posix.utime(support.TESTFN, None) - self.assertRaises(TypeError, posix.utime, support.TESTFN, (None, None)) - self.assertRaises(TypeError, posix.utime, support.TESTFN, (now, None)) - self.assertRaises(TypeError, posix.utime, support.TESTFN, (None, now)) - posix.utime(support.TESTFN, (int(now), int(now))) - posix.utime(support.TESTFN, (now, now)) + now = time.time() + posix.utime(support.TESTFN, None) + self.assertRaises(TypeError, posix.utime, support.TESTFN, (None, None)) + self.assertRaises(TypeError, posix.utime, support.TESTFN, (now, None)) + self.assertRaises(TypeError, posix.utime, support.TESTFN, (None, now)) + posix.utime(support.TESTFN, (int(now), int(now))) + posix.utime(support.TESTFN, (now, now)) def _test_chflags_regular_file(self, chflags_func, target_file, **kwargs): st = os.stat(target_file) @@ -665,6 +684,7 @@ class PosixTester(unittest.TestCase): self.assertEqual(type(k), item_type) self.assertEqual(type(v), item_type) + @unittest.skipUnless(hasattr(posix, 'getcwd'), 'test needs posix.getcwd()') def test_getcwd_long_pathnames(self): dirname = 'getcwd-test-directory-0123456789abcdef-01234567890abcdef' curdir = os.getcwd() diff --git a/Lib/test/test_set.py b/Lib/test/test_set.py index 13cefec..bfef621 100644 --- a/Lib/test/test_set.py +++ b/Lib/test/test_set.py @@ -625,10 +625,10 @@ class TestSet(TestJointOps, unittest.TestCase): myset >= myobj self.assertTrue(myobj.le_called) - # C API test only available in a debug build - if hasattr(set, "test_c_api"): - def test_c_api(self): - self.assertEqual(set().test_c_api(), True) + @unittest.skipUnless(hasattr(set, "test_c_api"), + 'C API test only available in a debug build') + def test_c_api(self): + self.assertEqual(set().test_c_api(), True) class SetSubclass(set): pass diff --git a/Lib/test/test_shutil.py b/Lib/test/test_shutil.py index 0fbe078..deb1577 100644 --- a/Lib/test/test_shutil.py +++ b/Lib/test/test_shutil.py @@ -195,37 +195,37 @@ class TestShutil(unittest.TestCase): self.assertIn(errors[1][2][1].filename, possible_args) - # See bug #1071513 for why we don't run this on cygwin - # and bug #1076467 for why we don't run this as root. - if (hasattr(os, 'chmod') and sys.platform[:6] != 'cygwin' - and not (hasattr(os, 'geteuid') and os.geteuid() == 0)): - def test_on_error(self): - self.errorState = 0 - os.mkdir(TESTFN) - self.addCleanup(shutil.rmtree, TESTFN) - - self.child_file_path = os.path.join(TESTFN, 'a') - self.child_dir_path = os.path.join(TESTFN, 'b') - support.create_empty_file(self.child_file_path) - os.mkdir(self.child_dir_path) - old_dir_mode = os.stat(TESTFN).st_mode - old_child_file_mode = os.stat(self.child_file_path).st_mode - old_child_dir_mode = os.stat(self.child_dir_path).st_mode - # Make unwritable. - new_mode = stat.S_IREAD|stat.S_IEXEC - os.chmod(self.child_file_path, new_mode) - os.chmod(self.child_dir_path, new_mode) - os.chmod(TESTFN, new_mode) - - self.addCleanup(os.chmod, TESTFN, old_dir_mode) - self.addCleanup(os.chmod, self.child_file_path, old_child_file_mode) - self.addCleanup(os.chmod, self.child_dir_path, old_child_dir_mode) - - shutil.rmtree(TESTFN, onerror=self.check_args_to_onerror) - # Test whether onerror has actually been called. - self.assertEqual(self.errorState, 3, - "Expected call to onerror function did not " - "happen.") + @unittest.skipUnless(hasattr(os, 'chmod'), 'requires os.chmod()') + @unittest.skipIf(sys.platform[:6] == 'cygwin', + "This test can't be run on Cygwin (issue #1071513).") + @unittest.skipIf(hasattr(os, 'geteuid') and os.geteuid() == 0, + "This test can't be run reliably as root (issue #1076467).") + def test_on_error(self): + self.errorState = 0 + os.mkdir(TESTFN) + self.addCleanup(shutil.rmtree, TESTFN) + + self.child_file_path = os.path.join(TESTFN, 'a') + self.child_dir_path = os.path.join(TESTFN, 'b') + support.create_empty_file(self.child_file_path) + os.mkdir(self.child_dir_path) + old_dir_mode = os.stat(TESTFN).st_mode + old_child_file_mode = os.stat(self.child_file_path).st_mode + old_child_dir_mode = os.stat(self.child_dir_path).st_mode + # Make unwritable. + new_mode = stat.S_IREAD|stat.S_IEXEC + os.chmod(self.child_file_path, new_mode) + os.chmod(self.child_dir_path, new_mode) + os.chmod(TESTFN, new_mode) + + self.addCleanup(os.chmod, TESTFN, old_dir_mode) + self.addCleanup(os.chmod, self.child_file_path, old_child_file_mode) + self.addCleanup(os.chmod, self.child_dir_path, old_child_dir_mode) + + shutil.rmtree(TESTFN, onerror=self.check_args_to_onerror) + # Test whether onerror has actually been called. + self.assertEqual(self.errorState, 3, + "Expected call to onerror function did not happen.") def check_args_to_onerror(self, func, arg, exc): # test_rmtree_errors deliberately runs rmtree @@ -807,38 +807,39 @@ class TestShutil(unittest.TestCase): finally: shutil.rmtree(TESTFN, ignore_errors=True) - if hasattr(os, "mkfifo"): - # Issue #3002: copyfile and copytree block indefinitely on named pipes - def test_copyfile_named_pipe(self): - os.mkfifo(TESTFN) - try: - self.assertRaises(shutil.SpecialFileError, - shutil.copyfile, TESTFN, TESTFN2) - self.assertRaises(shutil.SpecialFileError, - shutil.copyfile, __file__, TESTFN) - finally: - os.remove(TESTFN) + # Issue #3002: copyfile and copytree block indefinitely on named pipes + @unittest.skipUnless(hasattr(os, "mkfifo"), 'requires os.mkfifo()') + def test_copyfile_named_pipe(self): + os.mkfifo(TESTFN) + try: + self.assertRaises(shutil.SpecialFileError, + shutil.copyfile, TESTFN, TESTFN2) + self.assertRaises(shutil.SpecialFileError, + shutil.copyfile, __file__, TESTFN) + finally: + os.remove(TESTFN) - @support.skip_unless_symlink - def test_copytree_named_pipe(self): - os.mkdir(TESTFN) + @unittest.skipUnless(hasattr(os, "mkfifo"), 'requires os.mkfifo()') + @support.skip_unless_symlink + def test_copytree_named_pipe(self): + os.mkdir(TESTFN) + try: + subdir = os.path.join(TESTFN, "subdir") + os.mkdir(subdir) + pipe = os.path.join(subdir, "mypipe") + os.mkfifo(pipe) try: - subdir = os.path.join(TESTFN, "subdir") - os.mkdir(subdir) - pipe = os.path.join(subdir, "mypipe") - os.mkfifo(pipe) - try: - shutil.copytree(TESTFN, TESTFN2) - except shutil.Error as e: - errors = e.args[0] - self.assertEqual(len(errors), 1) - src, dst, error_msg = errors[0] - self.assertEqual("`%s` is a named pipe" % pipe, error_msg) - else: - self.fail("shutil.Error should have been raised") - finally: - shutil.rmtree(TESTFN, ignore_errors=True) - shutil.rmtree(TESTFN2, ignore_errors=True) + shutil.copytree(TESTFN, TESTFN2) + except shutil.Error as e: + errors = e.args[0] + self.assertEqual(len(errors), 1) + src, dst, error_msg = errors[0] + self.assertEqual("`%s` is a named pipe" % pipe, error_msg) + else: + self.fail("shutil.Error should have been raised") + finally: + shutil.rmtree(TESTFN, ignore_errors=True) + shutil.rmtree(TESTFN2, ignore_errors=True) def test_copytree_special_func(self): diff --git a/Lib/test/test_socket.py b/Lib/test/test_socket.py index e995c99..beff31a 100644 --- a/Lib/test/test_socket.py +++ b/Lib/test/test_socket.py @@ -817,16 +817,17 @@ class GeneralModuleTests(unittest.TestCase): self.assertRaises(TypeError, socket.if_nametoindex, 0) self.assertRaises(TypeError, socket.if_indextoname, '_DEADBEEF') + @unittest.skipUnless(hasattr(sys, 'getrefcount'), + 'test needs sys.getrefcount()') def testRefCountGetNameInfo(self): # Testing reference count for getnameinfo - if hasattr(sys, "getrefcount"): - try: - # On some versions, this loses a reference - orig = sys.getrefcount(__name__) - socket.getnameinfo(__name__,0) - except TypeError: - if sys.getrefcount(__name__) != orig: - self.fail("socket.getnameinfo loses a reference") + try: + # On some versions, this loses a reference + orig = sys.getrefcount(__name__) + socket.getnameinfo(__name__,0) + except TypeError: + if sys.getrefcount(__name__) != orig: + self.fail("socket.getnameinfo loses a reference") def testInterpreterCrash(self): # Making sure getnameinfo doesn't crash the interpreter @@ -931,17 +932,17 @@ class GeneralModuleTests(unittest.TestCase): # Check that setting it to an invalid type raises TypeError self.assertRaises(TypeError, socket.setdefaulttimeout, "spam") + @unittest.skipUnless(hasattr(socket, 'inet_aton'), + 'test needs socket.inet_aton()') def testIPv4_inet_aton_fourbytes(self): - if not hasattr(socket, 'inet_aton'): - return # No inet_aton, nothing to check # Test that issue1008086 and issue767150 are fixed. # It must return 4 bytes. self.assertEqual(b'\x00'*4, socket.inet_aton('0.0.0.0')) self.assertEqual(b'\xff'*4, socket.inet_aton('255.255.255.255')) + @unittest.skipUnless(hasattr(socket, 'inet_pton'), + 'test needs socket.inet_pton()') def testIPv4toString(self): - if not hasattr(socket, 'inet_pton'): - return # No inet_pton() on this platform from socket import inet_aton as f, inet_pton, AF_INET g = lambda a: inet_pton(AF_INET, a) @@ -970,9 +971,9 @@ class GeneralModuleTests(unittest.TestCase): assertInvalid(g, '1.2.3.4.5') assertInvalid(g, '::1') + @unittest.skipUnless(hasattr(socket, 'inet_pton'), + 'test needs socket.inet_pton()') def testIPv6toString(self): - if not hasattr(socket, 'inet_pton'): - return # No inet_pton() on this platform try: from socket import inet_pton, AF_INET6, has_ipv6 if not has_ipv6: @@ -1024,9 +1025,9 @@ class GeneralModuleTests(unittest.TestCase): assertInvalid('::1.2.3.4:0') assertInvalid('0.100.200.0:3:4:5:6:7:8') + @unittest.skipUnless(hasattr(socket, 'inet_ntop'), + 'test needs socket.inet_ntop()') def testStringToIPv4(self): - if not hasattr(socket, 'inet_ntop'): - return # No inet_ntop() on this platform from socket import inet_ntoa as f, inet_ntop, AF_INET g = lambda a: inet_ntop(AF_INET, a) assertInvalid = lambda func,a: self.assertRaises( @@ -1048,9 +1049,9 @@ class GeneralModuleTests(unittest.TestCase): assertInvalid(g, b'\x00' * 5) assertInvalid(g, b'\x00' * 16) + @unittest.skipUnless(hasattr(socket, 'inet_ntop'), + 'test needs socket.inet_ntop()') def testStringToIPv6(self): - if not hasattr(socket, 'inet_ntop'): - return # No inet_ntop() on this platform try: from socket import inet_ntop, AF_INET6, has_ipv6 if not has_ipv6: @@ -3660,6 +3661,8 @@ class TCPCloserTest(ThreadedTCPSocketTest): self.cli.connect((HOST, self.port)) time.sleep(1.0) +@unittest.skipUnless(hasattr(socket, 'socketpair'), + 'test needs socket.socketpair()') @unittest.skipUnless(thread, 'Threading required for this test.') class BasicSocketPairTest(SocketPairTest): @@ -3722,26 +3725,27 @@ class NonBlockingTCPTests(ThreadedTCPSocketTest): def _testSetBlocking(self): pass - if hasattr(socket, "SOCK_NONBLOCK"): - @support.requires_linux_version(2, 6, 28) - def testInitNonBlocking(self): - # reinit server socket - self.serv.close() - self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM | - socket.SOCK_NONBLOCK) - self.port = support.bind_port(self.serv) - self.serv.listen(1) - # actual testing - start = time.time() - try: - self.serv.accept() - except OSError: - pass - end = time.time() - self.assertTrue((end - start) < 1.0, "Error creating with non-blocking mode.") - - def _testInitNonBlocking(self): + @unittest.skipUnless(hasattr(socket, 'SOCK_NONBLOCK'), + 'test needs socket.SOCK_NONBLOCK') + @support.requires_linux_version(2, 6, 28) + def testInitNonBlocking(self): + # reinit server socket + self.serv.close() + self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM | + socket.SOCK_NONBLOCK) + self.port = support.bind_port(self.serv) + self.serv.listen(1) + # actual testing + start = time.time() + try: + self.serv.accept() + except OSError: pass + end = time.time() + self.assertTrue((end - start) < 1.0, "Error creating with non-blocking mode.") + + def _testInitNonBlocking(self): + pass def testInheritFlags(self): # Issue #7995: when calling accept() on a listening socket with a @@ -4431,12 +4435,12 @@ class TCPTimeoutTest(SocketTCPTest): if not ok: self.fail("accept() returned success when we did not expect it") + @unittest.skipUnless(hasattr(signal, 'alarm'), + 'test needs signal.alarm()') def testInterruptedTimeout(self): # XXX I don't know how to do this test on MSWindows or any other # plaform that doesn't support signal.alarm() or os.kill(), though # the bug should have existed on all platforms. - if not hasattr(signal, "alarm"): - return # can only test on *nix self.serv.settimeout(5.0) # must be longer than alarm class Alarm(Exception): pass @@ -4496,6 +4500,7 @@ class TestExceptions(unittest.TestCase): self.assertTrue(issubclass(socket.gaierror, OSError)) self.assertTrue(issubclass(socket.timeout, OSError)) +@unittest.skipUnless(sys.platform == 'linux', 'Linux specific test') class TestLinuxAbstractNamespace(unittest.TestCase): UNIX_PATH_MAX = 108 @@ -4531,6 +4536,7 @@ class TestLinuxAbstractNamespace(unittest.TestCase): finally: s.close() +@unittest.skipUnless(hasattr(socket, 'AF_UNIX'), 'test needs socket.AF_UNIX') class TestUnixDomain(unittest.TestCase): def setUp(self): @@ -4680,10 +4686,10 @@ def isTipcAvailable(): for line in f: if line.startswith("tipc "): return True - if support.verbose: - print("TIPC module is not loaded, please 'sudo modprobe tipc'") return False +@unittest.skipUnless(isTipcAvailable(), + "TIPC module is not loaded, please 'sudo modprobe tipc'") class TIPCTest(unittest.TestCase): def testRDM(self): srv = socket.socket(socket.AF_TIPC, socket.SOCK_RDM) @@ -4706,6 +4712,8 @@ class TIPCTest(unittest.TestCase): self.assertEqual(msg, MSG) +@unittest.skipUnless(isTipcAvailable(), + "TIPC module is not loaded, please 'sudo modprobe tipc'") class TIPCThreadableTest(unittest.TestCase, ThreadableTest): def __init__(self, methodName = 'runTest'): unittest.TestCase.__init__(self, methodName = methodName) @@ -5028,15 +5036,10 @@ def test_main(): InheritanceTest, NonblockConstantTest ]) - if hasattr(socket, "socketpair"): - tests.append(BasicSocketPairTest) - if hasattr(socket, "AF_UNIX"): - tests.append(TestUnixDomain) - if sys.platform == 'linux': - tests.append(TestLinuxAbstractNamespace) - if isTipcAvailable(): - tests.append(TIPCTest) - tests.append(TIPCThreadableTest) + tests.append(BasicSocketPairTest) + tests.append(TestUnixDomain) + tests.append(TestLinuxAbstractNamespace) + tests.extend([TIPCTest, TIPCThreadableTest]) tests.extend([BasicCANTest, CANTest]) tests.extend([BasicRDSTest, RDSTest]) tests.extend([ diff --git a/Lib/test/test_socketserver.py b/Lib/test/test_socketserver.py index 02c6c1e..0617b30 100644 --- a/Lib/test/test_socketserver.py +++ b/Lib/test/test_socketserver.py @@ -27,7 +27,10 @@ TEST_STR = b"hello world\n" HOST = test.support.HOST HAVE_UNIX_SOCKETS = hasattr(socket, "AF_UNIX") +requires_unix_sockets = unittest.skipUnless(HAVE_UNIX_SOCKETS, + 'requires Unix sockets') HAVE_FORKING = hasattr(os, "fork") +requires_forking = unittest.skipUnless(HAVE_FORKING, 'requires forking') def signal_alarm(n): """Call signal.alarm when it exists (i.e. not on Windows).""" @@ -175,31 +178,33 @@ class SocketServerTest(unittest.TestCase): socketserver.StreamRequestHandler, self.stream_examine) - if HAVE_FORKING: - def test_ForkingTCPServer(self): - with simple_subprocess(self): - self.run_server(socketserver.ForkingTCPServer, - socketserver.StreamRequestHandler, - self.stream_examine) - - if HAVE_UNIX_SOCKETS: - def test_UnixStreamServer(self): - self.run_server(socketserver.UnixStreamServer, + @requires_forking + def test_ForkingTCPServer(self): + with simple_subprocess(self): + self.run_server(socketserver.ForkingTCPServer, socketserver.StreamRequestHandler, self.stream_examine) - def test_ThreadingUnixStreamServer(self): - self.run_server(socketserver.ThreadingUnixStreamServer, + @requires_unix_sockets + def test_UnixStreamServer(self): + self.run_server(socketserver.UnixStreamServer, + socketserver.StreamRequestHandler, + self.stream_examine) + + @requires_unix_sockets + def test_ThreadingUnixStreamServer(self): + self.run_server(socketserver.ThreadingUnixStreamServer, + socketserver.StreamRequestHandler, + self.stream_examine) + + @requires_unix_sockets + @requires_forking + def test_ForkingUnixStreamServer(self): + with simple_subprocess(self): + self.run_server(ForkingUnixStreamServer, socketserver.StreamRequestHandler, self.stream_examine) - if HAVE_FORKING: - def test_ForkingUnixStreamServer(self): - with simple_subprocess(self): - self.run_server(ForkingUnixStreamServer, - socketserver.StreamRequestHandler, - self.stream_examine) - def test_UDPServer(self): self.run_server(socketserver.UDPServer, socketserver.DatagramRequestHandler, @@ -210,12 +215,12 @@ class SocketServerTest(unittest.TestCase): socketserver.DatagramRequestHandler, self.dgram_examine) - if HAVE_FORKING: - def test_ForkingUDPServer(self): - with simple_subprocess(self): - self.run_server(socketserver.ForkingUDPServer, - socketserver.DatagramRequestHandler, - self.dgram_examine) + @requires_forking + def test_ForkingUDPServer(self): + with simple_subprocess(self): + self.run_server(socketserver.ForkingUDPServer, + socketserver.DatagramRequestHandler, + self.dgram_examine) @contextlib.contextmanager def mocked_select_module(self): @@ -252,22 +257,24 @@ class SocketServerTest(unittest.TestCase): # Alas, on Linux (at least) recvfrom() doesn't return a meaningful # client address so this cannot work: - # if HAVE_UNIX_SOCKETS: - # def test_UnixDatagramServer(self): - # self.run_server(socketserver.UnixDatagramServer, - # socketserver.DatagramRequestHandler, - # self.dgram_examine) + # @requires_unix_sockets + # def test_UnixDatagramServer(self): + # self.run_server(socketserver.UnixDatagramServer, + # socketserver.DatagramRequestHandler, + # self.dgram_examine) # - # def test_ThreadingUnixDatagramServer(self): - # self.run_server(socketserver.ThreadingUnixDatagramServer, - # socketserver.DatagramRequestHandler, - # self.dgram_examine) + # @requires_unix_sockets + # def test_ThreadingUnixDatagramServer(self): + # self.run_server(socketserver.ThreadingUnixDatagramServer, + # socketserver.DatagramRequestHandler, + # self.dgram_examine) # - # if HAVE_FORKING: - # def test_ForkingUnixDatagramServer(self): - # self.run_server(socketserver.ForkingUnixDatagramServer, - # socketserver.DatagramRequestHandler, - # self.dgram_examine) + # @requires_unix_sockets + # @requires_forking + # def test_ForkingUnixDatagramServer(self): + # self.run_server(socketserver.ForkingUnixDatagramServer, + # socketserver.DatagramRequestHandler, + # self.dgram_examine) @reap_threads def test_shutdown(self): diff --git a/Lib/test/test_sys.py b/Lib/test/test_sys.py index 8437745..0565f39 100644 --- a/Lib/test/test_sys.py +++ b/Lib/test/test_sys.py @@ -293,15 +293,16 @@ class SysModuleTest(unittest.TestCase): def test_call_tracing(self): self.assertRaises(TypeError, sys.call_tracing, type, 2) + @unittest.skipUnless(hasattr(sys, "setdlopenflags"), + 'test needs sys.setdlopenflags()') def test_dlopenflags(self): - if hasattr(sys, "setdlopenflags"): - self.assertTrue(hasattr(sys, "getdlopenflags")) - self.assertRaises(TypeError, sys.getdlopenflags, 42) - oldflags = sys.getdlopenflags() - self.assertRaises(TypeError, sys.setdlopenflags) - sys.setdlopenflags(oldflags+1) - self.assertEqual(sys.getdlopenflags(), oldflags+1) - sys.setdlopenflags(oldflags) + self.assertTrue(hasattr(sys, "getdlopenflags")) + self.assertRaises(TypeError, sys.getdlopenflags, 42) + oldflags = sys.getdlopenflags() + self.assertRaises(TypeError, sys.setdlopenflags) + sys.setdlopenflags(oldflags+1) + self.assertEqual(sys.getdlopenflags(), oldflags+1) + sys.setdlopenflags(oldflags) @test.support.refcount_test def test_refcount(self): diff --git a/Lib/test/test_warnings.py b/Lib/test/test_warnings.py index 13c7df1..87463ac 100644 --- a/Lib/test/test_warnings.py +++ b/Lib/test/test_warnings.py @@ -271,11 +271,10 @@ class WarnTests(BaseTest): finally: warning_tests.__file__ = filename + @unittest.skipUnless(hasattr(sys, 'argv'), 'test needs sys.argv') def test_missing_filename_main_with_argv(self): # If __file__ is not specified and the caller is __main__ and sys.argv # exists, then use sys.argv[0] as the file. - if not hasattr(sys, 'argv'): - return filename = warning_tests.__file__ module_name = warning_tests.__name__ try: diff --git a/Lib/test/test_zlib.py b/Lib/test/test_zlib.py index 2f6f840..1daa8f8 100644 --- a/Lib/test/test_zlib.py +++ b/Lib/test/test_zlib.py @@ -7,6 +7,13 @@ from test.support import bigmemtest, _1G, _4G zlib = support.import_module('zlib') +requires_Compress_copy = unittest.skipUnless( + hasattr(zlib.compressobj(), "copy"), + 'requires Compress.copy()') +requires_Decompress_copy = unittest.skipUnless( + hasattr(zlib.decompressobj(), "copy"), + 'requires Decompress.copy()') + class VersionTestCase(unittest.TestCase): @@ -381,39 +388,39 @@ class CompressObjectTestCase(BaseCompressTestCase, unittest.TestCase): "mode=%i, level=%i") % (sync, level)) del obj + @unittest.skipUnless(hasattr(zlib, 'Z_SYNC_FLUSH'), + 'requires zlib.Z_SYNC_FLUSH') def test_odd_flush(self): # Test for odd flushing bugs noted in 2.0, and hopefully fixed in 2.1 import random + # Testing on 17K of "random" data - if hasattr(zlib, 'Z_SYNC_FLUSH'): - # Testing on 17K of "random" data - - # Create compressor and decompressor objects - co = zlib.compressobj(zlib.Z_BEST_COMPRESSION) - dco = zlib.decompressobj() + # Create compressor and decompressor objects + co = zlib.compressobj(zlib.Z_BEST_COMPRESSION) + dco = zlib.decompressobj() - # Try 17K of data - # generate random data stream + # Try 17K of data + # generate random data stream + try: + # In 2.3 and later, WichmannHill is the RNG of the bug report + gen = random.WichmannHill() + except AttributeError: try: - # In 2.3 and later, WichmannHill is the RNG of the bug report - gen = random.WichmannHill() + # 2.2 called it Random + gen = random.Random() except AttributeError: - try: - # 2.2 called it Random - gen = random.Random() - except AttributeError: - # others might simply have a single RNG - gen = random - gen.seed(1) - data = genblock(1, 17 * 1024, generator=gen) - - # compress, sync-flush, and decompress - first = co.compress(data) - second = co.flush(zlib.Z_SYNC_FLUSH) - expanded = dco.decompress(first + second) - - # if decompressed data is different from the input data, choke. - self.assertEqual(expanded, data, "17K random source doesn't match") + # others might simply have a single RNG + gen = random + gen.seed(1) + data = genblock(1, 17 * 1024, generator=gen) + + # compress, sync-flush, and decompress + first = co.compress(data) + second = co.flush(zlib.Z_SYNC_FLUSH) + expanded = dco.decompress(first + second) + + # if decompressed data is different from the input data, choke. + self.assertEqual(expanded, data, "17K random source doesn't match") def test_empty_flush(self): # Test that calling .flush() on unused objects works. @@ -525,67 +532,69 @@ class CompressObjectTestCase(BaseCompressTestCase, unittest.TestCase): data = zlib.compress(input2) self.assertEqual(dco.flush(), input1[1:]) - if hasattr(zlib.compressobj(), "copy"): - def test_compresscopy(self): - # Test copying a compression object - data0 = HAMLET_SCENE - data1 = bytes(str(HAMLET_SCENE, "ascii").swapcase(), "ascii") - c0 = zlib.compressobj(zlib.Z_BEST_COMPRESSION) - bufs0 = [] - bufs0.append(c0.compress(data0)) - - c1 = c0.copy() - bufs1 = bufs0[:] - - bufs0.append(c0.compress(data0)) - bufs0.append(c0.flush()) - s0 = b''.join(bufs0) - - bufs1.append(c1.compress(data1)) - bufs1.append(c1.flush()) - s1 = b''.join(bufs1) - - self.assertEqual(zlib.decompress(s0),data0+data0) - self.assertEqual(zlib.decompress(s1),data0+data1) - - def test_badcompresscopy(self): - # Test copying a compression object in an inconsistent state - c = zlib.compressobj() - c.compress(HAMLET_SCENE) - c.flush() - self.assertRaises(ValueError, c.copy) - - if hasattr(zlib.decompressobj(), "copy"): - def test_decompresscopy(self): - # Test copying a decompression object - data = HAMLET_SCENE - comp = zlib.compress(data) - # Test type of return value - self.assertIsInstance(comp, bytes) - - d0 = zlib.decompressobj() - bufs0 = [] - bufs0.append(d0.decompress(comp[:32])) - - d1 = d0.copy() - bufs1 = bufs0[:] - - bufs0.append(d0.decompress(comp[32:])) - s0 = b''.join(bufs0) - - bufs1.append(d1.decompress(comp[32:])) - s1 = b''.join(bufs1) - - self.assertEqual(s0,s1) - self.assertEqual(s0,data) - - def test_baddecompresscopy(self): - # Test copying a compression object in an inconsistent state - data = zlib.compress(HAMLET_SCENE) - d = zlib.decompressobj() - d.decompress(data) - d.flush() - self.assertRaises(ValueError, d.copy) + @requires_Compress_copy + def test_compresscopy(self): + # Test copying a compression object + data0 = HAMLET_SCENE + data1 = bytes(str(HAMLET_SCENE, "ascii").swapcase(), "ascii") + c0 = zlib.compressobj(zlib.Z_BEST_COMPRESSION) + bufs0 = [] + bufs0.append(c0.compress(data0)) + + c1 = c0.copy() + bufs1 = bufs0[:] + + bufs0.append(c0.compress(data0)) + bufs0.append(c0.flush()) + s0 = b''.join(bufs0) + + bufs1.append(c1.compress(data1)) + bufs1.append(c1.flush()) + s1 = b''.join(bufs1) + + self.assertEqual(zlib.decompress(s0),data0+data0) + self.assertEqual(zlib.decompress(s1),data0+data1) + + @requires_Compress_copy + def test_badcompresscopy(self): + # Test copying a compression object in an inconsistent state + c = zlib.compressobj() + c.compress(HAMLET_SCENE) + c.flush() + self.assertRaises(ValueError, c.copy) + + @requires_Decompress_copy + def test_decompresscopy(self): + # Test copying a decompression object + data = HAMLET_SCENE + comp = zlib.compress(data) + # Test type of return value + self.assertIsInstance(comp, bytes) + + d0 = zlib.decompressobj() + bufs0 = [] + bufs0.append(d0.decompress(comp[:32])) + + d1 = d0.copy() + bufs1 = bufs0[:] + + bufs0.append(d0.decompress(comp[32:])) + s0 = b''.join(bufs0) + + bufs1.append(d1.decompress(comp[32:])) + s1 = b''.join(bufs1) + + self.assertEqual(s0,s1) + self.assertEqual(s0,data) + + @requires_Decompress_copy + def test_baddecompresscopy(self): + # Test copying a compression object in an inconsistent state + data = zlib.compress(HAMLET_SCENE) + d = zlib.decompressobj() + d.decompress(data) + d.flush() + self.assertRaises(ValueError, d.copy) # Memory use of the following functions takes into account overallocation |