From 5b10b9824780b2181158902067912ee9e7b04657 Mon Sep 17 00:00:00 2001 From: Serhiy Storchaka Date: Tue, 5 Mar 2019 10:06:26 +0200 Subject: bpo-22831: Use "with" to avoid possible fd leaks in tests (part 2). (GH-10929) --- Lib/test/support/__init__.py | 14 +-- Lib/test/support/script_helper.py | 51 +++++----- Lib/test/test_argparse.py | 20 ++-- Lib/test/test_binhex.py | 10 +- Lib/test/test_bool.py | 15 ++- Lib/test/test_codecs.py | 5 +- Lib/test/test_epoll.py | 17 ++-- Lib/test/test_float.py | 17 ++-- Lib/test/test_ioctl.py | 6 +- Lib/test/test_os.py | 5 +- Lib/test/test_pipes.py | 5 +- Lib/test/test_poll.py | 13 ++- Lib/test/test_random.py | 5 +- Lib/test/test_runpy.py | 5 +- Lib/test/test_select.py | 31 +++--- Lib/test/test_shelve.py | 45 ++++----- Lib/test/test_site.py | 7 +- Lib/test/test_socketserver.py | 36 ++++--- Lib/test/test_tempfile.py | 10 +- Lib/test/test_threading.py | 12 +-- Lib/test/test_urllib2.py | 6 +- Lib/test/test_urllib2_localnet.py | 28 +++--- Lib/test/test_xmlrpc.py | 12 +-- Lib/test/test_zipimport.py | 198 +++++++++++++++++-------------------- Lib/test/test_zipimport_support.py | 14 ++- 25 files changed, 264 insertions(+), 323 deletions(-) diff --git a/Lib/test/support/__init__.py b/Lib/test/support/__init__.py index c0938d9..5bd15a2 100644 --- a/Lib/test/support/__init__.py +++ b/Lib/test/support/__init__.py @@ -706,9 +706,8 @@ def find_unused_port(family=socket.AF_INET, socktype=socket.SOCK_STREAM): issue if/when we come across it. """ - tempsock = socket.socket(family, socktype) - port = bind_port(tempsock) - tempsock.close() + with socket.socket(family, socktype) as tempsock: + port = bind_port(tempsock) del tempsock return port @@ -1785,10 +1784,11 @@ class _MemoryWatchdog: sys.stderr.flush() return - watchdog_script = findfile("memory_watchdog.py") - self.mem_watchdog = subprocess.Popen([sys.executable, watchdog_script], - stdin=f, stderr=subprocess.DEVNULL) - f.close() + with f: + watchdog_script = findfile("memory_watchdog.py") + self.mem_watchdog = subprocess.Popen([sys.executable, watchdog_script], + stdin=f, + stderr=subprocess.DEVNULL) self.started = True def stop(self): diff --git a/Lib/test/support/script_helper.py b/Lib/test/support/script_helper.py index 64b25aa..27a47f2 100644 --- a/Lib/test/support/script_helper.py +++ b/Lib/test/support/script_helper.py @@ -205,31 +205,28 @@ def make_script(script_dir, script_basename, source, omit_suffix=False): script_filename += os.extsep + 'py' script_name = os.path.join(script_dir, script_filename) # The script should be encoded to UTF-8, the default string encoding - script_file = open(script_name, 'w', encoding='utf-8') - script_file.write(source) - script_file.close() + with open(script_name, 'w', encoding='utf-8') as script_file: + script_file.write(source) importlib.invalidate_caches() return script_name def make_zip_script(zip_dir, zip_basename, script_name, name_in_zip=None): zip_filename = zip_basename+os.extsep+'zip' zip_name = os.path.join(zip_dir, zip_filename) - zip_file = zipfile.ZipFile(zip_name, 'w') - if name_in_zip is None: - parts = script_name.split(os.sep) - if len(parts) >= 2 and parts[-2] == '__pycache__': - legacy_pyc = make_legacy_pyc(source_from_cache(script_name)) - name_in_zip = os.path.basename(legacy_pyc) - script_name = legacy_pyc - else: - name_in_zip = os.path.basename(script_name) - zip_file.write(script_name, name_in_zip) - zip_file.close() + with zipfile.ZipFile(zip_name, 'w') as zip_file: + if name_in_zip is None: + parts = script_name.split(os.sep) + if len(parts) >= 2 and parts[-2] == '__pycache__': + legacy_pyc = make_legacy_pyc(source_from_cache(script_name)) + name_in_zip = os.path.basename(legacy_pyc) + script_name = legacy_pyc + else: + name_in_zip = os.path.basename(script_name) + zip_file.write(script_name, name_in_zip) #if test.support.verbose: - # zip_file = zipfile.ZipFile(zip_name, 'r') - # print 'Contents of %r:' % zip_name - # zip_file.printdir() - # zip_file.close() + # with zipfile.ZipFile(zip_name, 'r') as zip_file: + # print 'Contents of %r:' % zip_name + # zip_file.printdir() return zip_name, os.path.join(zip_name, name_in_zip) def make_pkg(pkg_dir, init_source=''): @@ -252,17 +249,15 @@ def make_zip_pkg(zip_dir, zip_basename, pkg_name, script_basename, script_name_in_zip = os.path.join(pkg_names[-1], os.path.basename(script_name)) zip_filename = zip_basename+os.extsep+'zip' zip_name = os.path.join(zip_dir, zip_filename) - zip_file = zipfile.ZipFile(zip_name, 'w') - for name in pkg_names: - init_name_in_zip = os.path.join(name, init_basename) - zip_file.write(init_name, init_name_in_zip) - zip_file.write(script_name, script_name_in_zip) - zip_file.close() + with zipfile.ZipFile(zip_name, 'w') as zip_file: + for name in pkg_names: + init_name_in_zip = os.path.join(name, init_basename) + zip_file.write(init_name, init_name_in_zip) + zip_file.write(script_name, script_name_in_zip) for name in unlink: os.unlink(name) #if test.support.verbose: - # zip_file = zipfile.ZipFile(zip_name, 'r') - # print 'Contents of %r:' % zip_name - # zip_file.printdir() - # zip_file.close() + # with zipfile.ZipFile(zip_name, 'r') as zip_file: + # print 'Contents of %r:' % zip_name + # zip_file.printdir() return zip_name, os.path.join(zip_name, script_name_in_zip) diff --git a/Lib/test/test_argparse.py b/Lib/test/test_argparse.py index c0c7cb0..e849c7b 100644 --- a/Lib/test/test_argparse.py +++ b/Lib/test/test_argparse.py @@ -1379,9 +1379,8 @@ class TestArgumentsFromFile(TempDirMixin, ParserTestCase): ('invalid', '@no-such-path\n'), ] for path, text in file_texts: - file = open(path, 'w') - file.write(text) - file.close() + with open(path, 'w') as file: + file.write(text) parser_signature = Sig(fromfile_prefix_chars='@') argument_signatures = [ @@ -1410,9 +1409,8 @@ class TestArgumentsFromFileConverter(TempDirMixin, ParserTestCase): ('hello', 'hello world!\n'), ] for path, text in file_texts: - file = open(path, 'w') - file.write(text) - file.close() + with open(path, 'w') as file: + file.write(text) class FromFileConverterArgumentParser(ErrorRaisingArgumentParser): @@ -1493,9 +1491,8 @@ class TestFileTypeR(TempDirMixin, ParserTestCase): def setUp(self): super(TestFileTypeR, self).setUp() for file_name in ['foo', 'bar']: - file = open(os.path.join(self.temp_dir, file_name), 'w') - file.write(file_name) - file.close() + with open(os.path.join(self.temp_dir, file_name), 'w') as file: + file.write(file_name) self.create_readonly_file('readonly') argument_signatures = [ @@ -1534,9 +1531,8 @@ class TestFileTypeRB(TempDirMixin, ParserTestCase): def setUp(self): super(TestFileTypeRB, self).setUp() for file_name in ['foo', 'bar']: - file = open(os.path.join(self.temp_dir, file_name), 'w') - file.write(file_name) - file.close() + with open(os.path.join(self.temp_dir, file_name), 'w') as file: + file.write(file_name) argument_signatures = [ Sig('-x', type=argparse.FileType('rb')), diff --git a/Lib/test/test_binhex.py b/Lib/test/test_binhex.py index 21f4463..2f3d53a 100644 --- a/Lib/test/test_binhex.py +++ b/Lib/test/test_binhex.py @@ -23,17 +23,15 @@ class BinHexTestCase(unittest.TestCase): DATA = b'Jack is my hero' def test_binhex(self): - f = open(self.fname1, 'wb') - f.write(self.DATA) - f.close() + with open(self.fname1, 'wb') as f: + f.write(self.DATA) binhex.binhex(self.fname1, self.fname2) binhex.hexbin(self.fname2, self.fname1) - f = open(self.fname1, 'rb') - finish = f.readline() - f.close() + with open(self.fname1, 'rb') as f: + finish = f.readline() self.assertEqual(self.DATA, finish) diff --git a/Lib/test/test_bool.py b/Lib/test/test_bool.py index c5a1f1f..909a59a 100644 --- a/Lib/test/test_bool.py +++ b/Lib/test/test_bool.py @@ -20,13 +20,11 @@ class BoolTest(unittest.TestCase): def test_print(self): try: - fo = open(support.TESTFN, "w") - print(False, True, file=fo) - fo.close() - fo = open(support.TESTFN, "r") - self.assertEqual(fo.read(), 'False True\n') + with open(support.TESTFN, "w") as fo: + print(False, True, file=fo) + with open(support.TESTFN, "r") as fi: + self.assertEqual(fi.read(), 'False True\n') finally: - fo.close() os.remove(support.TESTFN) def test_repr(self): @@ -245,9 +243,8 @@ class BoolTest(unittest.TestCase): def test_fileclosed(self): try: - f = open(support.TESTFN, "w") - self.assertIs(f.closed, False) - f.close() + with open(support.TESTFN, "w") as f: + self.assertIs(f.closed, False) self.assertIs(f.closed, True) finally: os.remove(support.TESTFN) diff --git a/Lib/test/test_codecs.py b/Lib/test/test_codecs.py index 79cddb8..893212e 100644 --- a/Lib/test/test_codecs.py +++ b/Lib/test/test_codecs.py @@ -1242,9 +1242,8 @@ class EscapeDecodeTest(unittest.TestCase): class RecodingTest(unittest.TestCase): def test_recoding(self): f = io.BytesIO() - f2 = codecs.EncodedFile(f, "unicode_internal", "utf-8") - f2.write("a") - f2.close() + with codecs.EncodedFile(f, "unicode_internal", "utf-8") as f2: + f2.write("a") # Python used to crash on this at exit because of a refcount # bug in _codecsmodule.c diff --git a/Lib/test/test_epoll.py b/Lib/test/test_epoll.py index efb54f4..53ce1d5 100644 --- a/Lib/test/test_epoll.py +++ b/Lib/test/test_epoll.py @@ -143,18 +143,17 @@ class TestEPoll(unittest.TestCase): def test_fromfd(self): server, client = self._connected_pair() - ep = select.epoll(2) - ep2 = select.epoll.fromfd(ep.fileno()) + with select.epoll(2) as ep: + ep2 = select.epoll.fromfd(ep.fileno()) - ep2.register(server.fileno(), select.EPOLLIN | select.EPOLLOUT) - ep2.register(client.fileno(), select.EPOLLIN | select.EPOLLOUT) + ep2.register(server.fileno(), select.EPOLLIN | select.EPOLLOUT) + ep2.register(client.fileno(), select.EPOLLIN | select.EPOLLOUT) - events = ep.poll(1, 4) - events2 = ep2.poll(0.9, 4) - self.assertEqual(len(events), 2) - self.assertEqual(len(events2), 2) + events = ep.poll(1, 4) + events2 = ep2.poll(0.9, 4) + self.assertEqual(len(events), 2) + self.assertEqual(len(events2), 2) - ep.close() try: ep2.poll(1, 4) except OSError as e: diff --git a/Lib/test/test_float.py b/Lib/test/test_float.py index 49c1fbc..5278d71 100644 --- a/Lib/test/test_float.py +++ b/Lib/test/test_float.py @@ -722,15 +722,14 @@ class FormatTestCase(unittest.TestCase): class ReprTestCase(unittest.TestCase): def test_repr(self): - floats_file = open(os.path.join(os.path.split(__file__)[0], - 'floating_points.txt')) - for line in floats_file: - line = line.strip() - if not line or line.startswith('#'): - continue - v = eval(line) - self.assertEqual(v, eval(repr(v))) - floats_file.close() + with open(os.path.join(os.path.split(__file__)[0], + 'floating_points.txt')) as floats_file: + for line in floats_file: + line = line.strip() + if not line or line.startswith('#'): + continue + v = eval(line) + self.assertEqual(v, eval(repr(v))) @unittest.skipUnless(getattr(sys, 'float_repr_style', '') == 'short', "applies only when using short float repr style") diff --git a/Lib/test/test_ioctl.py b/Lib/test/test_ioctl.py index b82b651..d1a5db9 100644 --- a/Lib/test/test_ioctl.py +++ b/Lib/test/test_ioctl.py @@ -11,9 +11,9 @@ try: except OSError: raise unittest.SkipTest("Unable to open /dev/tty") else: - # Skip if another process is in foreground - r = fcntl.ioctl(tty, termios.TIOCGPGRP, " ") - tty.close() + with tty: + # Skip if another process is in foreground + r = fcntl.ioctl(tty, termios.TIOCGPGRP, " ") rpgrp = struct.unpack("i", r)[0] if rpgrp not in (os.getpgrp(), os.getsid(0)): raise unittest.SkipTest("Neither the process group nor the session " diff --git a/Lib/test/test_os.py b/Lib/test/test_os.py index 2340b84..746be12 100644 --- a/Lib/test/test_os.py +++ b/Lib/test/test_os.py @@ -1200,9 +1200,8 @@ class MakedirTests(unittest.TestCase): def test_exist_ok_existing_regular_file(self): base = support.TESTFN path = os.path.join(support.TESTFN, 'dir1') - f = open(path, 'w') - f.write('abc') - f.close() + with open(path, 'w') as f: + f.write('abc') self.assertRaises(OSError, os.makedirs, path) self.assertRaises(OSError, os.makedirs, path, exist_ok=False) self.assertRaises(OSError, os.makedirs, path, exist_ok=True) diff --git a/Lib/test/test_pipes.py b/Lib/test/test_pipes.py index ad01d08..1d538b9 100644 --- a/Lib/test/test_pipes.py +++ b/Lib/test/test_pipes.py @@ -23,9 +23,8 @@ class SimplePipeTests(unittest.TestCase): self.skipTest('tr is not available') t = pipes.Template() t.append(s_command, pipes.STDIN_STDOUT) - f = t.open(TESTFN, 'w') - f.write('hello world #1') - f.close() + with t.open(TESTFN, 'w') as f: + f.write('hello world #1') with open(TESTFN) as f: self.assertEqual(f.read(), 'HELLO WORLD #1') diff --git a/Lib/test/test_poll.py b/Lib/test/test_poll.py index 445032d..ef966bf 100644 --- a/Lib/test/test_poll.py +++ b/Lib/test/test_poll.py @@ -83,13 +83,12 @@ class PollTests(unittest.TestCase): r = p.poll() self.assertEqual(r[0], (FD, select.POLLNVAL)) - f = open(TESTFN, 'w') - fd = f.fileno() - p = select.poll() - p.register(f) - r = p.poll() - self.assertEqual(r[0][0], fd) - f.close() + with open(TESTFN, 'w') as f: + fd = f.fileno() + p = select.poll() + p.register(f) + r = p.poll() + self.assertEqual(r[0][0], fd) r = p.poll() self.assertEqual(r[0], (fd, select.POLLNVAL)) os.unlink(TESTFN) diff --git a/Lib/test/test_random.py b/Lib/test/test_random.py index b35cb39..e818a7b 100644 --- a/Lib/test/test_random.py +++ b/Lib/test/test_random.py @@ -268,9 +268,8 @@ class TestBasicOps: ("randv2_64.pck", 866), ("randv3.pck", 343)] for file, value in files: - f = open(support.findfile(file),"rb") - r = pickle.load(f) - f.close() + with open(support.findfile(file),"rb") as f: + r = pickle.load(f) self.assertEqual(int(r.random()*1000), value) def test_bug_9025(self): diff --git a/Lib/test/test_runpy.py b/Lib/test/test_runpy.py index 02b4d62..800b483 100644 --- a/Lib/test/test_runpy.py +++ b/Lib/test/test_runpy.py @@ -238,9 +238,8 @@ class RunModuleTestCase(unittest.TestCase, CodeExecutionMixin): if verbose > 1: print(" Next level in:", sub_dir) if verbose > 1: print(" Created:", pkg_fname) mod_fname = os.path.join(sub_dir, test_fname) - mod_file = open(mod_fname, "w") - mod_file.write(source) - mod_file.close() + with open(mod_fname, "w") as mod_file: + mod_file.write(source) if verbose > 1: print(" Created:", mod_fname) mod_name = (pkg_name+".")*depth + mod_base mod_spec = importlib.util.spec_from_file_location(mod_name, diff --git a/Lib/test/test_select.py b/Lib/test/test_select.py index a973f3f..458998a 100644 --- a/Lib/test/test_select.py +++ b/Lib/test/test_select.py @@ -46,24 +46,23 @@ class SelectTestCase(unittest.TestCase): def test_select(self): cmd = 'for i in 0 1 2 3 4 5 6 7 8 9; do echo testing...; sleep 1; done' - p = os.popen(cmd, 'r') - for tout in (0, 1, 2, 4, 8, 16) + (None,)*10: - if support.verbose: - print('timeout =', tout) - rfd, wfd, xfd = select.select([p], [], [], tout) - if (rfd, wfd, xfd) == ([], [], []): - continue - if (rfd, wfd, xfd) == ([p], [], []): - line = p.readline() + with os.popen(cmd) as p: + for tout in (0, 1, 2, 4, 8, 16) + (None,)*10: if support.verbose: - print(repr(line)) - if not line: + print('timeout =', tout) + rfd, wfd, xfd = select.select([p], [], [], tout) + if (rfd, wfd, xfd) == ([], [], []): + continue + if (rfd, wfd, xfd) == ([p], [], []): + line = p.readline() if support.verbose: - print('EOF') - break - continue - self.fail('Unexpected return values from select():', rfd, wfd, xfd) - p.close() + print(repr(line)) + if not line: + if support.verbose: + print('EOF') + break + continue + self.fail('Unexpected return values from select():', rfd, wfd, xfd) # Issue 16230: Crash on select resized list def test_select_mutated(self): diff --git a/Lib/test/test_shelve.py b/Lib/test/test_shelve.py index b71af2b..9ffe2cb 100644 --- a/Lib/test/test_shelve.py +++ b/Lib/test/test_shelve.py @@ -88,15 +88,13 @@ class TestCase(unittest.TestCase): def test_in_memory_shelf(self): d1 = byteskeydict() - s = shelve.Shelf(d1, protocol=0) - s['key1'] = (1,2,3,4) - self.assertEqual(s['key1'], (1,2,3,4)) - s.close() + with shelve.Shelf(d1, protocol=0) as s: + s['key1'] = (1,2,3,4) + self.assertEqual(s['key1'], (1,2,3,4)) d2 = byteskeydict() - s = shelve.Shelf(d2, protocol=1) - s['key1'] = (1,2,3,4) - self.assertEqual(s['key1'], (1,2,3,4)) - s.close() + with shelve.Shelf(d2, protocol=1) as s: + s['key1'] = (1,2,3,4) + self.assertEqual(s['key1'], (1,2,3,4)) self.assertEqual(len(d1), 1) self.assertEqual(len(d2), 1) @@ -104,20 +102,18 @@ class TestCase(unittest.TestCase): def test_mutable_entry(self): d1 = byteskeydict() - s = shelve.Shelf(d1, protocol=2, writeback=False) - s['key1'] = [1,2,3,4] - self.assertEqual(s['key1'], [1,2,3,4]) - s['key1'].append(5) - self.assertEqual(s['key1'], [1,2,3,4]) - s.close() + with shelve.Shelf(d1, protocol=2, writeback=False) as s: + s['key1'] = [1,2,3,4] + self.assertEqual(s['key1'], [1,2,3,4]) + s['key1'].append(5) + self.assertEqual(s['key1'], [1,2,3,4]) d2 = byteskeydict() - s = shelve.Shelf(d2, protocol=2, writeback=True) - s['key1'] = [1,2,3,4] - self.assertEqual(s['key1'], [1,2,3,4]) - s['key1'].append(5) - self.assertEqual(s['key1'], [1,2,3,4,5]) - s.close() + with shelve.Shelf(d2, protocol=2, writeback=True) as s: + s['key1'] = [1,2,3,4] + self.assertEqual(s['key1'], [1,2,3,4]) + s['key1'].append(5) + self.assertEqual(s['key1'], [1,2,3,4,5]) self.assertEqual(len(d1), 1) self.assertEqual(len(d2), 1) @@ -140,11 +136,10 @@ class TestCase(unittest.TestCase): d = {} key = 'key' encodedkey = key.encode('utf-8') - s = shelve.Shelf(d, writeback=True) - s[key] = [1] - p1 = d[encodedkey] # Will give a KeyError if backing store not updated - s['key'].append(2) - s.close() + with shelve.Shelf(d, writeback=True) as s: + s[key] = [1] + p1 = d[encodedkey] # Will give a KeyError if backing store not updated + s['key'].append(2) p2 = d[encodedkey] self.assertNotEqual(p1, p2) # Write creates new object in store diff --git a/Lib/test/test_site.py b/Lib/test/test_site.py index 8f04728..8643da0 100644 --- a/Lib/test/test_site.py +++ b/Lib/test/test_site.py @@ -125,10 +125,9 @@ class HelperFunctionsTests(unittest.TestCase): pth_dir = os.path.abspath(pth_dir) pth_basename = pth_name + '.pth' pth_fn = os.path.join(pth_dir, pth_basename) - pth_file = open(pth_fn, 'w', encoding='utf-8') - self.addCleanup(lambda: os.remove(pth_fn)) - pth_file.write(contents) - pth_file.close() + with open(pth_fn, 'w', encoding='utf-8') as pth_file: + self.addCleanup(lambda: os.remove(pth_fn)) + pth_file.write(contents) return pth_dir, pth_basename def test_addpackage_import_bad_syntax(self): diff --git a/Lib/test/test_socketserver.py b/Lib/test/test_socketserver.py index 6584ba5..8aed4b6 100644 --- a/Lib/test/test_socketserver.py +++ b/Lib/test/test_socketserver.py @@ -157,27 +157,25 @@ class SocketServerTest(unittest.TestCase): if verbose: print("done") def stream_examine(self, proto, addr): - s = socket.socket(proto, socket.SOCK_STREAM) - s.connect(addr) - s.sendall(TEST_STR) - buf = data = receive(s, 100) - while data and b'\n' not in buf: - data = receive(s, 100) - buf += data - self.assertEqual(buf, TEST_STR) - s.close() + with socket.socket(proto, socket.SOCK_STREAM) as s: + s.connect(addr) + s.sendall(TEST_STR) + buf = data = receive(s, 100) + while data and b'\n' not in buf: + data = receive(s, 100) + buf += data + self.assertEqual(buf, TEST_STR) def dgram_examine(self, proto, addr): - s = socket.socket(proto, socket.SOCK_DGRAM) - if HAVE_UNIX_SOCKETS and proto == socket.AF_UNIX: - s.bind(self.pickaddr(proto)) - s.sendto(TEST_STR, addr) - buf = data = receive(s, 100) - while data and b'\n' not in buf: - data = receive(s, 100) - buf += data - self.assertEqual(buf, TEST_STR) - s.close() + with socket.socket(proto, socket.SOCK_DGRAM) as s: + if HAVE_UNIX_SOCKETS and proto == socket.AF_UNIX: + s.bind(self.pickaddr(proto)) + s.sendto(TEST_STR, addr) + buf = data = receive(s, 100) + while data and b'\n' not in buf: + data = receive(s, 100) + buf += data + self.assertEqual(buf, TEST_STR) def test_TCPServer(self): self.run_server(socketserver.TCPServer, diff --git a/Lib/test/test_tempfile.py b/Lib/test/test_tempfile.py index 3c0b9a7..489141d 100644 --- a/Lib/test/test_tempfile.py +++ b/Lib/test/test_tempfile.py @@ -573,9 +573,8 @@ class TestGetTempDir(BaseTestCase): # sneaky: just instantiate a NamedTemporaryFile, which # defaults to writing into the directory returned by # gettempdir. - file = tempfile.NamedTemporaryFile() - file.write(b"blat") - file.close() + with tempfile.NamedTemporaryFile() as file: + file.write(b"blat") def test_same_thing(self): # gettempdir always returns the same object @@ -891,9 +890,8 @@ class TestNamedTemporaryFile(BaseTestCase): # A NamedTemporaryFile is deleted when closed dir = tempfile.mkdtemp() try: - f = tempfile.NamedTemporaryFile(dir=dir) - f.write(b'blat') - f.close() + with tempfile.NamedTemporaryFile(dir=dir) as f: + f.write(b'blat') self.assertFalse(os.path.exists(f.name), "NamedTemporaryFile %s exists after close" % f.name) finally: diff --git a/Lib/test/test_threading.py b/Lib/test/test_threading.py index af2d6b5..2ddc77b 100644 --- a/Lib/test/test_threading.py +++ b/Lib/test/test_threading.py @@ -788,13 +788,11 @@ class ThreadJoinOnShutdown(BaseTestCase): def random_io(): '''Loop for a while sleeping random tiny amounts and doing some I/O.''' while True: - in_f = open(os.__file__, 'rb') - stuff = in_f.read(200) - null_f = open(os.devnull, 'wb') - null_f.write(stuff) - time.sleep(random.random() / 1995) - null_f.close() - in_f.close() + with open(os.__file__, 'rb') as in_f: + stuff = in_f.read(200) + with open(os.devnull, 'wb') as null_f: + null_f.write(stuff) + time.sleep(random.random() / 1995) thread_has_run.add(threading.current_thread()) def main(): diff --git a/Lib/test/test_urllib2.py b/Lib/test/test_urllib2.py index 876fcd4..c6d275e 100644 --- a/Lib/test/test_urllib2.py +++ b/Lib/test/test_urllib2.py @@ -57,10 +57,8 @@ class TrivialTests(unittest.TestCase): else: file_url = "file://%s" % fname - f = urllib.request.urlopen(file_url) - - f.read() - f.close() + with urllib.request.urlopen(file_url) as f: + f.read() def test_parse_http_list(self): tests = [ diff --git a/Lib/test/test_urllib2_localnet.py b/Lib/test/test_urllib2_localnet.py index 77dec0c..591b48d 100644 --- a/Lib/test/test_urllib2_localnet.py +++ b/Lib/test/test_urllib2_localnet.py @@ -371,10 +371,9 @@ class ProxyAuthTests(unittest.TestCase): self.proxy_digest_handler.add_password(self.REALM, self.URL, self.USER, self.PASSWD) self.digest_auth_handler.set_qop("auth") - result = self.opener.open(self.URL) - while result.read(): - pass - result.close() + with self.opener.open(self.URL) as result: + while result.read(): + pass def test_proxy_qop_auth_int_works_or_throws_urlerror(self): self.proxy_digest_handler.add_password(self.REALM, self.URL, @@ -386,11 +385,11 @@ class ProxyAuthTests(unittest.TestCase): # It's okay if we don't support auth-int, but we certainly # shouldn't receive any kind of exception here other than # a URLError. - result = None - if result: - while result.read(): - pass - result.close() + pass + else: + with result: + while result.read(): + pass def GetRequestHandler(responses): @@ -611,14 +610,11 @@ class TestUrlopen(unittest.TestCase): def test_basic(self): handler = self.start_server() - open_url = urllib.request.urlopen("http://localhost:%s" % handler.port) - for attr in ("read", "close", "info", "geturl"): - self.assertTrue(hasattr(open_url, attr), "object returned from " - "urlopen lacks the %s attribute" % attr) - try: + with urllib.request.urlopen("http://localhost:%s" % handler.port) as open_url: + for attr in ("read", "close", "info", "geturl"): + self.assertTrue(hasattr(open_url, attr), "object returned from " + "urlopen lacks the %s attribute" % attr) self.assertTrue(open_url.read(), "calling 'read' failed") - finally: - open_url.close() def test_info(self): handler = self.start_server() diff --git a/Lib/test/test_xmlrpc.py b/Lib/test/test_xmlrpc.py index 916e9c4..9c8b695 100644 --- a/Lib/test/test_xmlrpc.py +++ b/Lib/test/test_xmlrpc.py @@ -821,10 +821,9 @@ class SimpleServerTestCase(BaseServerTestCase): def test_404(self): # send POST with http.client, it should return 404 header and # 'Not Found' message. - conn = http.client.HTTPConnection(ADDR, PORT) - conn.request('POST', '/this-is-not-valid') - response = conn.getresponse() - conn.close() + with contextlib.closing(http.client.HTTPConnection(ADDR, PORT)) as conn: + conn.request('POST', '/this-is-not-valid') + response = conn.getresponse() self.assertEqual(response.status, 404) self.assertEqual(response.reason, 'Not Found') @@ -944,9 +943,8 @@ class SimpleServerTestCase(BaseServerTestCase): def test_partial_post(self): # Check that a partial POST doesn't make the server loop: issue #14001. - conn = http.client.HTTPConnection(ADDR, PORT) - conn.request('POST', '/RPC2 HTTP/1.0\r\nContent-Length: 100\r\n\r\nbye') - conn.close() + with contextlib.closing(http.client.HTTPConnection(ADDR, PORT)) as conn: + conn.request('POST', '/RPC2 HTTP/1.0\r\nContent-Length: 100\r\n\r\nbye') def test_context_manager(self): with xmlrpclib.ServerProxy(URL) as server: diff --git a/Lib/test/test_zipimport.py b/Lib/test/test_zipimport.py index e98b090..d4f619e 100644 --- a/Lib/test/test_zipimport.py +++ b/Lib/test/test_zipimport.py @@ -422,57 +422,53 @@ class UncompressedZipImportTestCase(ImportHooksBaseTestCase): packdir2 + TESTMOD + pyc_ext: (NOW, test_pyc), "spam" + pyc_ext: (NOW, test_pyc)} - z = ZipFile(TEMP_ZIP, "w") - try: + self.addCleanup(support.unlink, TEMP_ZIP) + with ZipFile(TEMP_ZIP, "w") as z: for name, (mtime, data) in files.items(): zinfo = ZipInfo(name, time.localtime(mtime)) zinfo.compress_type = self.compression zinfo.comment = b"spam" z.writestr(zinfo, data) - z.close() - - zi = zipimport.zipimporter(TEMP_ZIP) - self.assertEqual(zi.archive, TEMP_ZIP) - self.assertEqual(zi.is_package(TESTPACK), True) - - find_mod = zi.find_module('spam') - self.assertIsNotNone(find_mod) - self.assertIsInstance(find_mod, zipimport.zipimporter) - self.assertFalse(find_mod.is_package('spam')) - load_mod = find_mod.load_module('spam') - self.assertEqual(find_mod.get_filename('spam'), load_mod.__file__) - - mod = zi.load_module(TESTPACK) - self.assertEqual(zi.get_filename(TESTPACK), mod.__file__) - - existing_pack_path = importlib.import_module(TESTPACK).__path__[0] - expected_path_path = os.path.join(TEMP_ZIP, TESTPACK) - self.assertEqual(existing_pack_path, expected_path_path) - - self.assertEqual(zi.is_package(packdir + '__init__'), False) - self.assertEqual(zi.is_package(packdir + TESTPACK2), True) - self.assertEqual(zi.is_package(packdir2 + TESTMOD), False) - - mod_path = packdir2 + TESTMOD - mod_name = module_path_to_dotted_name(mod_path) - mod = importlib.import_module(mod_name) - self.assertTrue(mod_name in sys.modules) - self.assertEqual(zi.get_source(TESTPACK), None) - self.assertEqual(zi.get_source(mod_path), None) - self.assertEqual(zi.get_filename(mod_path), mod.__file__) - # To pass in the module name instead of the path, we must use the - # right importer - loader = mod.__loader__ - self.assertEqual(loader.get_source(mod_name), None) - self.assertEqual(loader.get_filename(mod_name), mod.__file__) - - # test prefix and archivepath members - zi2 = zipimport.zipimporter(TEMP_ZIP + os.sep + TESTPACK) - self.assertEqual(zi2.archive, TEMP_ZIP) - self.assertEqual(zi2.prefix, TESTPACK + os.sep) - finally: - z.close() - os.remove(TEMP_ZIP) + + zi = zipimport.zipimporter(TEMP_ZIP) + self.assertEqual(zi.archive, TEMP_ZIP) + self.assertEqual(zi.is_package(TESTPACK), True) + + find_mod = zi.find_module('spam') + self.assertIsNotNone(find_mod) + self.assertIsInstance(find_mod, zipimport.zipimporter) + self.assertFalse(find_mod.is_package('spam')) + load_mod = find_mod.load_module('spam') + self.assertEqual(find_mod.get_filename('spam'), load_mod.__file__) + + mod = zi.load_module(TESTPACK) + self.assertEqual(zi.get_filename(TESTPACK), mod.__file__) + + existing_pack_path = importlib.import_module(TESTPACK).__path__[0] + expected_path_path = os.path.join(TEMP_ZIP, TESTPACK) + self.assertEqual(existing_pack_path, expected_path_path) + + self.assertEqual(zi.is_package(packdir + '__init__'), False) + self.assertEqual(zi.is_package(packdir + TESTPACK2), True) + self.assertEqual(zi.is_package(packdir2 + TESTMOD), False) + + mod_path = packdir2 + TESTMOD + mod_name = module_path_to_dotted_name(mod_path) + mod = importlib.import_module(mod_name) + self.assertTrue(mod_name in sys.modules) + self.assertEqual(zi.get_source(TESTPACK), None) + self.assertEqual(zi.get_source(mod_path), None) + self.assertEqual(zi.get_filename(mod_path), mod.__file__) + # To pass in the module name instead of the path, we must use the + # right importer + loader = mod.__loader__ + self.assertEqual(loader.get_source(mod_name), None) + self.assertEqual(loader.get_filename(mod_name), mod.__file__) + + # test prefix and archivepath members + zi2 = zipimport.zipimporter(TEMP_ZIP + os.sep + TESTPACK) + self.assertEqual(zi2.archive, TEMP_ZIP) + self.assertEqual(zi2.prefix, TESTPACK + os.sep) def testZipImporterMethodsInSubDirectory(self): packdir = TESTPACK + os.sep @@ -480,67 +476,60 @@ class UncompressedZipImportTestCase(ImportHooksBaseTestCase): files = {packdir2 + "__init__" + pyc_ext: (NOW, test_pyc), packdir2 + TESTMOD + pyc_ext: (NOW, test_pyc)} - z = ZipFile(TEMP_ZIP, "w") - try: + self.addCleanup(support.unlink, TEMP_ZIP) + with ZipFile(TEMP_ZIP, "w") as z: for name, (mtime, data) in files.items(): zinfo = ZipInfo(name, time.localtime(mtime)) zinfo.compress_type = self.compression zinfo.comment = b"eggs" z.writestr(zinfo, data) - z.close() - - zi = zipimport.zipimporter(TEMP_ZIP + os.sep + packdir) - self.assertEqual(zi.archive, TEMP_ZIP) - self.assertEqual(zi.prefix, packdir) - self.assertEqual(zi.is_package(TESTPACK2), True) - mod = zi.load_module(TESTPACK2) - self.assertEqual(zi.get_filename(TESTPACK2), mod.__file__) - - self.assertEqual( - zi.is_package(TESTPACK2 + os.sep + '__init__'), False) - self.assertEqual( - zi.is_package(TESTPACK2 + os.sep + TESTMOD), False) - - pkg_path = TEMP_ZIP + os.sep + packdir + TESTPACK2 - zi2 = zipimport.zipimporter(pkg_path) - find_mod_dotted = zi2.find_module(TESTMOD) - self.assertIsNotNone(find_mod_dotted) - self.assertIsInstance(find_mod_dotted, zipimport.zipimporter) - self.assertFalse(zi2.is_package(TESTMOD)) - load_mod = find_mod_dotted.load_module(TESTMOD) - self.assertEqual( - find_mod_dotted.get_filename(TESTMOD), load_mod.__file__) - - mod_path = TESTPACK2 + os.sep + TESTMOD - mod_name = module_path_to_dotted_name(mod_path) - mod = importlib.import_module(mod_name) - self.assertTrue(mod_name in sys.modules) - self.assertEqual(zi.get_source(TESTPACK2), None) - self.assertEqual(zi.get_source(mod_path), None) - self.assertEqual(zi.get_filename(mod_path), mod.__file__) - # To pass in the module name instead of the path, we must use the - # right importer. - loader = mod.__loader__ - self.assertEqual(loader.get_source(mod_name), None) - self.assertEqual(loader.get_filename(mod_name), mod.__file__) - finally: - z.close() - os.remove(TEMP_ZIP) + + zi = zipimport.zipimporter(TEMP_ZIP + os.sep + packdir) + self.assertEqual(zi.archive, TEMP_ZIP) + self.assertEqual(zi.prefix, packdir) + self.assertEqual(zi.is_package(TESTPACK2), True) + mod = zi.load_module(TESTPACK2) + self.assertEqual(zi.get_filename(TESTPACK2), mod.__file__) + + self.assertEqual( + zi.is_package(TESTPACK2 + os.sep + '__init__'), False) + self.assertEqual( + zi.is_package(TESTPACK2 + os.sep + TESTMOD), False) + + pkg_path = TEMP_ZIP + os.sep + packdir + TESTPACK2 + zi2 = zipimport.zipimporter(pkg_path) + find_mod_dotted = zi2.find_module(TESTMOD) + self.assertIsNotNone(find_mod_dotted) + self.assertIsInstance(find_mod_dotted, zipimport.zipimporter) + self.assertFalse(zi2.is_package(TESTMOD)) + load_mod = find_mod_dotted.load_module(TESTMOD) + self.assertEqual( + find_mod_dotted.get_filename(TESTMOD), load_mod.__file__) + + mod_path = TESTPACK2 + os.sep + TESTMOD + mod_name = module_path_to_dotted_name(mod_path) + mod = importlib.import_module(mod_name) + self.assertTrue(mod_name in sys.modules) + self.assertEqual(zi.get_source(TESTPACK2), None) + self.assertEqual(zi.get_source(mod_path), None) + self.assertEqual(zi.get_filename(mod_path), mod.__file__) + # To pass in the module name instead of the path, we must use the + # right importer. + loader = mod.__loader__ + self.assertEqual(loader.get_source(mod_name), None) + self.assertEqual(loader.get_filename(mod_name), mod.__file__) def testGetData(self): - z = ZipFile(TEMP_ZIP, "w") - z.compression = self.compression - try: + self.addCleanup(support.unlink, TEMP_ZIP) + with ZipFile(TEMP_ZIP, "w") as z: + z.compression = self.compression name = "testdata.dat" data = bytes(x for x in range(256)) z.writestr(name, data) - z.close() - zi = zipimport.zipimporter(TEMP_ZIP) - self.assertEqual(data, zi.get_data(name)) - self.assertIn('zipimporter object', repr(zi)) - finally: - z.close() - os.remove(TEMP_ZIP) + + zi = zipimport.zipimporter(TEMP_ZIP) + self.assertEqual(data, zi.get_data(name)) + self.assertIn('zipimporter object', repr(zi)) def testImporterAttr(self): src = """if 1: # indent hack @@ -643,15 +632,12 @@ class UncompressedZipImportTestCase(ImportHooksBaseTestCase): "need an unencodable filename") def testUnencodable(self): filename = support.TESTFN_UNENCODABLE + ".zip" - z = ZipFile(filename, "w") - zinfo = ZipInfo(TESTMOD + ".py", time.localtime(NOW)) - zinfo.compress_type = self.compression - z.writestr(zinfo, test_src) - z.close() - try: - zipimport.zipimporter(filename).load_module(TESTMOD) - finally: - os.remove(filename) + self.addCleanup(support.unlink, filename) + with ZipFile(filename, "w") as z: + zinfo = ZipInfo(TESTMOD + ".py", time.localtime(NOW)) + zinfo.compress_type = self.compression + z.writestr(zinfo, test_src) + zipimport.zipimporter(filename).load_module(TESTMOD) def testBytesPath(self): filename = support.TESTFN + ".zip" diff --git a/Lib/test/test_zipimport_support.py b/Lib/test/test_zipimport_support.py index 84d526c..8856101 100644 --- a/Lib/test/test_zipimport_support.py +++ b/Lib/test/test_zipimport_support.py @@ -122,15 +122,13 @@ class ZipSupportTests(unittest.TestCase): test_src) zip_name, run_name = make_zip_script(d, 'test_zip', script_name) - z = zipfile.ZipFile(zip_name, 'a') - for mod_name, src in sample_sources.items(): - z.writestr(mod_name + ".py", src) - z.close() + with zipfile.ZipFile(zip_name, 'a') as z: + for mod_name, src in sample_sources.items(): + z.writestr(mod_name + ".py", src) if verbose: - zip_file = zipfile.ZipFile(zip_name, 'r') - print ('Contents of %r:' % zip_name) - zip_file.printdir() - zip_file.close() + with zipfile.ZipFile(zip_name, 'r') as zip_file: + print ('Contents of %r:' % zip_name) + zip_file.printdir() os.remove(script_name) sys.path.insert(0, zip_name) import test_zipped_doctest -- cgit v0.12