summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSerhiy Storchaka <storchaka@gmail.com>2019-03-05 08:06:26 (GMT)
committerGitHub <noreply@github.com>2019-03-05 08:06:26 (GMT)
commit5b10b9824780b2181158902067912ee9e7b04657 (patch)
tree1c89bea944e6638eb008c8f106b2ee48cc9448d1
parent9e4861f52349011cd5916eef8e8344575e8ac426 (diff)
downloadcpython-5b10b9824780b2181158902067912ee9e7b04657.zip
cpython-5b10b9824780b2181158902067912ee9e7b04657.tar.gz
cpython-5b10b9824780b2181158902067912ee9e7b04657.tar.bz2
bpo-22831: Use "with" to avoid possible fd leaks in tests (part 2). (GH-10929)
-rw-r--r--Lib/test/support/__init__.py14
-rw-r--r--Lib/test/support/script_helper.py51
-rw-r--r--Lib/test/test_argparse.py20
-rw-r--r--Lib/test/test_binhex.py10
-rw-r--r--Lib/test/test_bool.py15
-rw-r--r--Lib/test/test_codecs.py5
-rw-r--r--Lib/test/test_epoll.py17
-rw-r--r--Lib/test/test_float.py17
-rw-r--r--Lib/test/test_ioctl.py6
-rw-r--r--Lib/test/test_os.py5
-rw-r--r--Lib/test/test_pipes.py5
-rw-r--r--Lib/test/test_poll.py13
-rw-r--r--Lib/test/test_random.py5
-rw-r--r--Lib/test/test_runpy.py5
-rw-r--r--Lib/test/test_select.py31
-rw-r--r--Lib/test/test_shelve.py45
-rw-r--r--Lib/test/test_site.py7
-rw-r--r--Lib/test/test_socketserver.py36
-rw-r--r--Lib/test/test_tempfile.py10
-rw-r--r--Lib/test/test_threading.py12
-rw-r--r--Lib/test/test_urllib2.py6
-rw-r--r--Lib/test/test_urllib2_localnet.py28
-rw-r--r--Lib/test/test_xmlrpc.py12
-rw-r--r--Lib/test/test_zipimport.py198
-rw-r--r--Lib/test/test_zipimport_support.py14
25 files changed, 264 insertions, 323 deletions
diff --git a/Lib/test/support/__init__.py b/Lib/test/support/__init__.py
index c0938d9..5bd15a2 100644
--- a/Lib/test/support/__init__.py
+++ b/Lib/test/support/__init__.py
@@ -706,9 +706,8 @@ def find_unused_port(family=socket.AF_INET, socktype=socket.SOCK_STREAM):
issue if/when we come across it.
"""
- tempsock = socket.socket(family, socktype)
- port = bind_port(tempsock)
- tempsock.close()
+ with socket.socket(family, socktype) as tempsock:
+ port = bind_port(tempsock)
del tempsock
return port
@@ -1785,10 +1784,11 @@ class _MemoryWatchdog:
sys.stderr.flush()
return
- watchdog_script = findfile("memory_watchdog.py")
- self.mem_watchdog = subprocess.Popen([sys.executable, watchdog_script],
- stdin=f, stderr=subprocess.DEVNULL)
- f.close()
+ with f:
+ watchdog_script = findfile("memory_watchdog.py")
+ self.mem_watchdog = subprocess.Popen([sys.executable, watchdog_script],
+ stdin=f,
+ stderr=subprocess.DEVNULL)
self.started = True
def stop(self):
diff --git a/Lib/test/support/script_helper.py b/Lib/test/support/script_helper.py
index 64b25aa..27a47f2 100644
--- a/Lib/test/support/script_helper.py
+++ b/Lib/test/support/script_helper.py
@@ -205,31 +205,28 @@ def make_script(script_dir, script_basename, source, omit_suffix=False):
script_filename += os.extsep + 'py'
script_name = os.path.join(script_dir, script_filename)
# The script should be encoded to UTF-8, the default string encoding
- script_file = open(script_name, 'w', encoding='utf-8')
- script_file.write(source)
- script_file.close()
+ with open(script_name, 'w', encoding='utf-8') as script_file:
+ script_file.write(source)
importlib.invalidate_caches()
return script_name
def make_zip_script(zip_dir, zip_basename, script_name, name_in_zip=None):
zip_filename = zip_basename+os.extsep+'zip'
zip_name = os.path.join(zip_dir, zip_filename)
- zip_file = zipfile.ZipFile(zip_name, 'w')
- if name_in_zip is None:
- parts = script_name.split(os.sep)
- if len(parts) >= 2 and parts[-2] == '__pycache__':
- legacy_pyc = make_legacy_pyc(source_from_cache(script_name))
- name_in_zip = os.path.basename(legacy_pyc)
- script_name = legacy_pyc
- else:
- name_in_zip = os.path.basename(script_name)
- zip_file.write(script_name, name_in_zip)
- zip_file.close()
+ with zipfile.ZipFile(zip_name, 'w') as zip_file:
+ if name_in_zip is None:
+ parts = script_name.split(os.sep)
+ if len(parts) >= 2 and parts[-2] == '__pycache__':
+ legacy_pyc = make_legacy_pyc(source_from_cache(script_name))
+ name_in_zip = os.path.basename(legacy_pyc)
+ script_name = legacy_pyc
+ else:
+ name_in_zip = os.path.basename(script_name)
+ zip_file.write(script_name, name_in_zip)
#if test.support.verbose:
- # zip_file = zipfile.ZipFile(zip_name, 'r')
- # print 'Contents of %r:' % zip_name
- # zip_file.printdir()
- # zip_file.close()
+ # with zipfile.ZipFile(zip_name, 'r') as zip_file:
+ # print 'Contents of %r:' % zip_name
+ # zip_file.printdir()
return zip_name, os.path.join(zip_name, name_in_zip)
def make_pkg(pkg_dir, init_source=''):
@@ -252,17 +249,15 @@ def make_zip_pkg(zip_dir, zip_basename, pkg_name, script_basename,
script_name_in_zip = os.path.join(pkg_names[-1], os.path.basename(script_name))
zip_filename = zip_basename+os.extsep+'zip'
zip_name = os.path.join(zip_dir, zip_filename)
- zip_file = zipfile.ZipFile(zip_name, 'w')
- for name in pkg_names:
- init_name_in_zip = os.path.join(name, init_basename)
- zip_file.write(init_name, init_name_in_zip)
- zip_file.write(script_name, script_name_in_zip)
- zip_file.close()
+ with zipfile.ZipFile(zip_name, 'w') as zip_file:
+ for name in pkg_names:
+ init_name_in_zip = os.path.join(name, init_basename)
+ zip_file.write(init_name, init_name_in_zip)
+ zip_file.write(script_name, script_name_in_zip)
for name in unlink:
os.unlink(name)
#if test.support.verbose:
- # zip_file = zipfile.ZipFile(zip_name, 'r')
- # print 'Contents of %r:' % zip_name
- # zip_file.printdir()
- # zip_file.close()
+ # with zipfile.ZipFile(zip_name, 'r') as zip_file:
+ # print 'Contents of %r:' % zip_name
+ # zip_file.printdir()
return zip_name, os.path.join(zip_name, script_name_in_zip)
diff --git a/Lib/test/test_argparse.py b/Lib/test/test_argparse.py
index c0c7cb0..e849c7b 100644
--- a/Lib/test/test_argparse.py
+++ b/Lib/test/test_argparse.py
@@ -1379,9 +1379,8 @@ class TestArgumentsFromFile(TempDirMixin, ParserTestCase):
('invalid', '@no-such-path\n'),
]
for path, text in file_texts:
- file = open(path, 'w')
- file.write(text)
- file.close()
+ with open(path, 'w') as file:
+ file.write(text)
parser_signature = Sig(fromfile_prefix_chars='@')
argument_signatures = [
@@ -1410,9 +1409,8 @@ class TestArgumentsFromFileConverter(TempDirMixin, ParserTestCase):
('hello', 'hello world!\n'),
]
for path, text in file_texts:
- file = open(path, 'w')
- file.write(text)
- file.close()
+ with open(path, 'w') as file:
+ file.write(text)
class FromFileConverterArgumentParser(ErrorRaisingArgumentParser):
@@ -1493,9 +1491,8 @@ class TestFileTypeR(TempDirMixin, ParserTestCase):
def setUp(self):
super(TestFileTypeR, self).setUp()
for file_name in ['foo', 'bar']:
- file = open(os.path.join(self.temp_dir, file_name), 'w')
- file.write(file_name)
- file.close()
+ with open(os.path.join(self.temp_dir, file_name), 'w') as file:
+ file.write(file_name)
self.create_readonly_file('readonly')
argument_signatures = [
@@ -1534,9 +1531,8 @@ class TestFileTypeRB(TempDirMixin, ParserTestCase):
def setUp(self):
super(TestFileTypeRB, self).setUp()
for file_name in ['foo', 'bar']:
- file = open(os.path.join(self.temp_dir, file_name), 'w')
- file.write(file_name)
- file.close()
+ with open(os.path.join(self.temp_dir, file_name), 'w') as file:
+ file.write(file_name)
argument_signatures = [
Sig('-x', type=argparse.FileType('rb')),
diff --git a/Lib/test/test_binhex.py b/Lib/test/test_binhex.py
index 21f4463..2f3d53a 100644
--- a/Lib/test/test_binhex.py
+++ b/Lib/test/test_binhex.py
@@ -23,17 +23,15 @@ class BinHexTestCase(unittest.TestCase):
DATA = b'Jack is my hero'
def test_binhex(self):
- f = open(self.fname1, 'wb')
- f.write(self.DATA)
- f.close()
+ with open(self.fname1, 'wb') as f:
+ f.write(self.DATA)
binhex.binhex(self.fname1, self.fname2)
binhex.hexbin(self.fname2, self.fname1)
- f = open(self.fname1, 'rb')
- finish = f.readline()
- f.close()
+ with open(self.fname1, 'rb') as f:
+ finish = f.readline()
self.assertEqual(self.DATA, finish)
diff --git a/Lib/test/test_bool.py b/Lib/test/test_bool.py
index c5a1f1f..909a59a 100644
--- a/Lib/test/test_bool.py
+++ b/Lib/test/test_bool.py
@@ -20,13 +20,11 @@ class BoolTest(unittest.TestCase):
def test_print(self):
try:
- fo = open(support.TESTFN, "w")
- print(False, True, file=fo)
- fo.close()
- fo = open(support.TESTFN, "r")
- self.assertEqual(fo.read(), 'False True\n')
+ with open(support.TESTFN, "w") as fo:
+ print(False, True, file=fo)
+ with open(support.TESTFN, "r") as fi:
+ self.assertEqual(fi.read(), 'False True\n')
finally:
- fo.close()
os.remove(support.TESTFN)
def test_repr(self):
@@ -245,9 +243,8 @@ class BoolTest(unittest.TestCase):
def test_fileclosed(self):
try:
- f = open(support.TESTFN, "w")
- self.assertIs(f.closed, False)
- f.close()
+ with open(support.TESTFN, "w") as f:
+ self.assertIs(f.closed, False)
self.assertIs(f.closed, True)
finally:
os.remove(support.TESTFN)
diff --git a/Lib/test/test_codecs.py b/Lib/test/test_codecs.py
index 79cddb8..893212e 100644
--- a/Lib/test/test_codecs.py
+++ b/Lib/test/test_codecs.py
@@ -1242,9 +1242,8 @@ class EscapeDecodeTest(unittest.TestCase):
class RecodingTest(unittest.TestCase):
def test_recoding(self):
f = io.BytesIO()
- f2 = codecs.EncodedFile(f, "unicode_internal", "utf-8")
- f2.write("a")
- f2.close()
+ with codecs.EncodedFile(f, "unicode_internal", "utf-8") as f2:
+ f2.write("a")
# Python used to crash on this at exit because of a refcount
# bug in _codecsmodule.c
diff --git a/Lib/test/test_epoll.py b/Lib/test/test_epoll.py
index efb54f4..53ce1d5 100644
--- a/Lib/test/test_epoll.py
+++ b/Lib/test/test_epoll.py
@@ -143,18 +143,17 @@ class TestEPoll(unittest.TestCase):
def test_fromfd(self):
server, client = self._connected_pair()
- ep = select.epoll(2)
- ep2 = select.epoll.fromfd(ep.fileno())
+ with select.epoll(2) as ep:
+ ep2 = select.epoll.fromfd(ep.fileno())
- ep2.register(server.fileno(), select.EPOLLIN | select.EPOLLOUT)
- ep2.register(client.fileno(), select.EPOLLIN | select.EPOLLOUT)
+ ep2.register(server.fileno(), select.EPOLLIN | select.EPOLLOUT)
+ ep2.register(client.fileno(), select.EPOLLIN | select.EPOLLOUT)
- events = ep.poll(1, 4)
- events2 = ep2.poll(0.9, 4)
- self.assertEqual(len(events), 2)
- self.assertEqual(len(events2), 2)
+ events = ep.poll(1, 4)
+ events2 = ep2.poll(0.9, 4)
+ self.assertEqual(len(events), 2)
+ self.assertEqual(len(events2), 2)
- ep.close()
try:
ep2.poll(1, 4)
except OSError as e:
diff --git a/Lib/test/test_float.py b/Lib/test/test_float.py
index 49c1fbc..5278d71 100644
--- a/Lib/test/test_float.py
+++ b/Lib/test/test_float.py
@@ -722,15 +722,14 @@ class FormatTestCase(unittest.TestCase):
class ReprTestCase(unittest.TestCase):
def test_repr(self):
- floats_file = open(os.path.join(os.path.split(__file__)[0],
- 'floating_points.txt'))
- for line in floats_file:
- line = line.strip()
- if not line or line.startswith('#'):
- continue
- v = eval(line)
- self.assertEqual(v, eval(repr(v)))
- floats_file.close()
+ with open(os.path.join(os.path.split(__file__)[0],
+ 'floating_points.txt')) as floats_file:
+ for line in floats_file:
+ line = line.strip()
+ if not line or line.startswith('#'):
+ continue
+ v = eval(line)
+ self.assertEqual(v, eval(repr(v)))
@unittest.skipUnless(getattr(sys, 'float_repr_style', '') == 'short',
"applies only when using short float repr style")
diff --git a/Lib/test/test_ioctl.py b/Lib/test/test_ioctl.py
index b82b651..d1a5db9 100644
--- a/Lib/test/test_ioctl.py
+++ b/Lib/test/test_ioctl.py
@@ -11,9 +11,9 @@ try:
except OSError:
raise unittest.SkipTest("Unable to open /dev/tty")
else:
- # Skip if another process is in foreground
- r = fcntl.ioctl(tty, termios.TIOCGPGRP, " ")
- tty.close()
+ with tty:
+ # Skip if another process is in foreground
+ r = fcntl.ioctl(tty, termios.TIOCGPGRP, " ")
rpgrp = struct.unpack("i", r)[0]
if rpgrp not in (os.getpgrp(), os.getsid(0)):
raise unittest.SkipTest("Neither the process group nor the session "
diff --git a/Lib/test/test_os.py b/Lib/test/test_os.py
index 2340b84..746be12 100644
--- a/Lib/test/test_os.py
+++ b/Lib/test/test_os.py
@@ -1200,9 +1200,8 @@ class MakedirTests(unittest.TestCase):
def test_exist_ok_existing_regular_file(self):
base = support.TESTFN
path = os.path.join(support.TESTFN, 'dir1')
- f = open(path, 'w')
- f.write('abc')
- f.close()
+ with open(path, 'w') as f:
+ f.write('abc')
self.assertRaises(OSError, os.makedirs, path)
self.assertRaises(OSError, os.makedirs, path, exist_ok=False)
self.assertRaises(OSError, os.makedirs, path, exist_ok=True)
diff --git a/Lib/test/test_pipes.py b/Lib/test/test_pipes.py
index ad01d08..1d538b9 100644
--- a/Lib/test/test_pipes.py
+++ b/Lib/test/test_pipes.py
@@ -23,9 +23,8 @@ class SimplePipeTests(unittest.TestCase):
self.skipTest('tr is not available')
t = pipes.Template()
t.append(s_command, pipes.STDIN_STDOUT)
- f = t.open(TESTFN, 'w')
- f.write('hello world #1')
- f.close()
+ with t.open(TESTFN, 'w') as f:
+ f.write('hello world #1')
with open(TESTFN) as f:
self.assertEqual(f.read(), 'HELLO WORLD #1')
diff --git a/Lib/test/test_poll.py b/Lib/test/test_poll.py
index 445032d..ef966bf 100644
--- a/Lib/test/test_poll.py
+++ b/Lib/test/test_poll.py
@@ -83,13 +83,12 @@ class PollTests(unittest.TestCase):
r = p.poll()
self.assertEqual(r[0], (FD, select.POLLNVAL))
- f = open(TESTFN, 'w')
- fd = f.fileno()
- p = select.poll()
- p.register(f)
- r = p.poll()
- self.assertEqual(r[0][0], fd)
- f.close()
+ with open(TESTFN, 'w') as f:
+ fd = f.fileno()
+ p = select.poll()
+ p.register(f)
+ r = p.poll()
+ self.assertEqual(r[0][0], fd)
r = p.poll()
self.assertEqual(r[0], (fd, select.POLLNVAL))
os.unlink(TESTFN)
diff --git a/Lib/test/test_random.py b/Lib/test/test_random.py
index b35cb39..e818a7b 100644
--- a/Lib/test/test_random.py
+++ b/Lib/test/test_random.py
@@ -268,9 +268,8 @@ class TestBasicOps:
("randv2_64.pck", 866),
("randv3.pck", 343)]
for file, value in files:
- f = open(support.findfile(file),"rb")
- r = pickle.load(f)
- f.close()
+ with open(support.findfile(file),"rb") as f:
+ r = pickle.load(f)
self.assertEqual(int(r.random()*1000), value)
def test_bug_9025(self):
diff --git a/Lib/test/test_runpy.py b/Lib/test/test_runpy.py
index 02b4d62..800b483 100644
--- a/Lib/test/test_runpy.py
+++ b/Lib/test/test_runpy.py
@@ -238,9 +238,8 @@ class RunModuleTestCase(unittest.TestCase, CodeExecutionMixin):
if verbose > 1: print(" Next level in:", sub_dir)
if verbose > 1: print(" Created:", pkg_fname)
mod_fname = os.path.join(sub_dir, test_fname)
- mod_file = open(mod_fname, "w")
- mod_file.write(source)
- mod_file.close()
+ with open(mod_fname, "w") as mod_file:
+ mod_file.write(source)
if verbose > 1: print(" Created:", mod_fname)
mod_name = (pkg_name+".")*depth + mod_base
mod_spec = importlib.util.spec_from_file_location(mod_name,
diff --git a/Lib/test/test_select.py b/Lib/test/test_select.py
index a973f3f..458998a 100644
--- a/Lib/test/test_select.py
+++ b/Lib/test/test_select.py
@@ -46,24 +46,23 @@ class SelectTestCase(unittest.TestCase):
def test_select(self):
cmd = 'for i in 0 1 2 3 4 5 6 7 8 9; do echo testing...; sleep 1; done'
- p = os.popen(cmd, 'r')
- for tout in (0, 1, 2, 4, 8, 16) + (None,)*10:
- if support.verbose:
- print('timeout =', tout)
- rfd, wfd, xfd = select.select([p], [], [], tout)
- if (rfd, wfd, xfd) == ([], [], []):
- continue
- if (rfd, wfd, xfd) == ([p], [], []):
- line = p.readline()
+ with os.popen(cmd) as p:
+ for tout in (0, 1, 2, 4, 8, 16) + (None,)*10:
if support.verbose:
- print(repr(line))
- if not line:
+ print('timeout =', tout)
+ rfd, wfd, xfd = select.select([p], [], [], tout)
+ if (rfd, wfd, xfd) == ([], [], []):
+ continue
+ if (rfd, wfd, xfd) == ([p], [], []):
+ line = p.readline()
if support.verbose:
- print('EOF')
- break
- continue
- self.fail('Unexpected return values from select():', rfd, wfd, xfd)
- p.close()
+ print(repr(line))
+ if not line:
+ if support.verbose:
+ print('EOF')
+ break
+ continue
+ self.fail('Unexpected return values from select():', rfd, wfd, xfd)
# Issue 16230: Crash on select resized list
def test_select_mutated(self):
diff --git a/Lib/test/test_shelve.py b/Lib/test/test_shelve.py
index b71af2b..9ffe2cb 100644
--- a/Lib/test/test_shelve.py
+++ b/Lib/test/test_shelve.py
@@ -88,15 +88,13 @@ class TestCase(unittest.TestCase):
def test_in_memory_shelf(self):
d1 = byteskeydict()
- s = shelve.Shelf(d1, protocol=0)
- s['key1'] = (1,2,3,4)
- self.assertEqual(s['key1'], (1,2,3,4))
- s.close()
+ with shelve.Shelf(d1, protocol=0) as s:
+ s['key1'] = (1,2,3,4)
+ self.assertEqual(s['key1'], (1,2,3,4))
d2 = byteskeydict()
- s = shelve.Shelf(d2, protocol=1)
- s['key1'] = (1,2,3,4)
- self.assertEqual(s['key1'], (1,2,3,4))
- s.close()
+ with shelve.Shelf(d2, protocol=1) as s:
+ s['key1'] = (1,2,3,4)
+ self.assertEqual(s['key1'], (1,2,3,4))
self.assertEqual(len(d1), 1)
self.assertEqual(len(d2), 1)
@@ -104,20 +102,18 @@ class TestCase(unittest.TestCase):
def test_mutable_entry(self):
d1 = byteskeydict()
- s = shelve.Shelf(d1, protocol=2, writeback=False)
- s['key1'] = [1,2,3,4]
- self.assertEqual(s['key1'], [1,2,3,4])
- s['key1'].append(5)
- self.assertEqual(s['key1'], [1,2,3,4])
- s.close()
+ with shelve.Shelf(d1, protocol=2, writeback=False) as s:
+ s['key1'] = [1,2,3,4]
+ self.assertEqual(s['key1'], [1,2,3,4])
+ s['key1'].append(5)
+ self.assertEqual(s['key1'], [1,2,3,4])
d2 = byteskeydict()
- s = shelve.Shelf(d2, protocol=2, writeback=True)
- s['key1'] = [1,2,3,4]
- self.assertEqual(s['key1'], [1,2,3,4])
- s['key1'].append(5)
- self.assertEqual(s['key1'], [1,2,3,4,5])
- s.close()
+ with shelve.Shelf(d2, protocol=2, writeback=True) as s:
+ s['key1'] = [1,2,3,4]
+ self.assertEqual(s['key1'], [1,2,3,4])
+ s['key1'].append(5)
+ self.assertEqual(s['key1'], [1,2,3,4,5])
self.assertEqual(len(d1), 1)
self.assertEqual(len(d2), 1)
@@ -140,11 +136,10 @@ class TestCase(unittest.TestCase):
d = {}
key = 'key'
encodedkey = key.encode('utf-8')
- s = shelve.Shelf(d, writeback=True)
- s[key] = [1]
- p1 = d[encodedkey] # Will give a KeyError if backing store not updated
- s['key'].append(2)
- s.close()
+ with shelve.Shelf(d, writeback=True) as s:
+ s[key] = [1]
+ p1 = d[encodedkey] # Will give a KeyError if backing store not updated
+ s['key'].append(2)
p2 = d[encodedkey]
self.assertNotEqual(p1, p2) # Write creates new object in store
diff --git a/Lib/test/test_site.py b/Lib/test/test_site.py
index 8f04728..8643da0 100644
--- a/Lib/test/test_site.py
+++ b/Lib/test/test_site.py
@@ -125,10 +125,9 @@ class HelperFunctionsTests(unittest.TestCase):
pth_dir = os.path.abspath(pth_dir)
pth_basename = pth_name + '.pth'
pth_fn = os.path.join(pth_dir, pth_basename)
- pth_file = open(pth_fn, 'w', encoding='utf-8')
- self.addCleanup(lambda: os.remove(pth_fn))
- pth_file.write(contents)
- pth_file.close()
+ with open(pth_fn, 'w', encoding='utf-8') as pth_file:
+ self.addCleanup(lambda: os.remove(pth_fn))
+ pth_file.write(contents)
return pth_dir, pth_basename
def test_addpackage_import_bad_syntax(self):
diff --git a/Lib/test/test_socketserver.py b/Lib/test/test_socketserver.py
index 6584ba5..8aed4b6 100644
--- a/Lib/test/test_socketserver.py
+++ b/Lib/test/test_socketserver.py
@@ -157,27 +157,25 @@ class SocketServerTest(unittest.TestCase):
if verbose: print("done")
def stream_examine(self, proto, addr):
- s = socket.socket(proto, socket.SOCK_STREAM)
- s.connect(addr)
- s.sendall(TEST_STR)
- buf = data = receive(s, 100)
- while data and b'\n' not in buf:
- data = receive(s, 100)
- buf += data
- self.assertEqual(buf, TEST_STR)
- s.close()
+ with socket.socket(proto, socket.SOCK_STREAM) as s:
+ s.connect(addr)
+ s.sendall(TEST_STR)
+ buf = data = receive(s, 100)
+ while data and b'\n' not in buf:
+ data = receive(s, 100)
+ buf += data
+ self.assertEqual(buf, TEST_STR)
def dgram_examine(self, proto, addr):
- s = socket.socket(proto, socket.SOCK_DGRAM)
- if HAVE_UNIX_SOCKETS and proto == socket.AF_UNIX:
- s.bind(self.pickaddr(proto))
- s.sendto(TEST_STR, addr)
- buf = data = receive(s, 100)
- while data and b'\n' not in buf:
- data = receive(s, 100)
- buf += data
- self.assertEqual(buf, TEST_STR)
- s.close()
+ with socket.socket(proto, socket.SOCK_DGRAM) as s:
+ if HAVE_UNIX_SOCKETS and proto == socket.AF_UNIX:
+ s.bind(self.pickaddr(proto))
+ s.sendto(TEST_STR, addr)
+ buf = data = receive(s, 100)
+ while data and b'\n' not in buf:
+ data = receive(s, 100)
+ buf += data
+ self.assertEqual(buf, TEST_STR)
def test_TCPServer(self):
self.run_server(socketserver.TCPServer,
diff --git a/Lib/test/test_tempfile.py b/Lib/test/test_tempfile.py
index 3c0b9a7..489141d 100644
--- a/Lib/test/test_tempfile.py
+++ b/Lib/test/test_tempfile.py
@@ -573,9 +573,8 @@ class TestGetTempDir(BaseTestCase):
# sneaky: just instantiate a NamedTemporaryFile, which
# defaults to writing into the directory returned by
# gettempdir.
- file = tempfile.NamedTemporaryFile()
- file.write(b"blat")
- file.close()
+ with tempfile.NamedTemporaryFile() as file:
+ file.write(b"blat")
def test_same_thing(self):
# gettempdir always returns the same object
@@ -891,9 +890,8 @@ class TestNamedTemporaryFile(BaseTestCase):
# A NamedTemporaryFile is deleted when closed
dir = tempfile.mkdtemp()
try:
- f = tempfile.NamedTemporaryFile(dir=dir)
- f.write(b'blat')
- f.close()
+ with tempfile.NamedTemporaryFile(dir=dir) as f:
+ f.write(b'blat')
self.assertFalse(os.path.exists(f.name),
"NamedTemporaryFile %s exists after close" % f.name)
finally:
diff --git a/Lib/test/test_threading.py b/Lib/test/test_threading.py
index af2d6b5..2ddc77b 100644
--- a/Lib/test/test_threading.py
+++ b/Lib/test/test_threading.py
@@ -788,13 +788,11 @@ class ThreadJoinOnShutdown(BaseTestCase):
def random_io():
'''Loop for a while sleeping random tiny amounts and doing some I/O.'''
while True:
- in_f = open(os.__file__, 'rb')
- stuff = in_f.read(200)
- null_f = open(os.devnull, 'wb')
- null_f.write(stuff)
- time.sleep(random.random() / 1995)
- null_f.close()
- in_f.close()
+ with open(os.__file__, 'rb') as in_f:
+ stuff = in_f.read(200)
+ with open(os.devnull, 'wb') as null_f:
+ null_f.write(stuff)
+ time.sleep(random.random() / 1995)
thread_has_run.add(threading.current_thread())
def main():
diff --git a/Lib/test/test_urllib2.py b/Lib/test/test_urllib2.py
index 876fcd4..c6d275e 100644
--- a/Lib/test/test_urllib2.py
+++ b/Lib/test/test_urllib2.py
@@ -57,10 +57,8 @@ class TrivialTests(unittest.TestCase):
else:
file_url = "file://%s" % fname
- f = urllib.request.urlopen(file_url)
-
- f.read()
- f.close()
+ with urllib.request.urlopen(file_url) as f:
+ f.read()
def test_parse_http_list(self):
tests = [
diff --git a/Lib/test/test_urllib2_localnet.py b/Lib/test/test_urllib2_localnet.py
index 77dec0c..591b48d 100644
--- a/Lib/test/test_urllib2_localnet.py
+++ b/Lib/test/test_urllib2_localnet.py
@@ -371,10 +371,9 @@ class ProxyAuthTests(unittest.TestCase):
self.proxy_digest_handler.add_password(self.REALM, self.URL,
self.USER, self.PASSWD)
self.digest_auth_handler.set_qop("auth")
- result = self.opener.open(self.URL)
- while result.read():
- pass
- result.close()
+ with self.opener.open(self.URL) as result:
+ while result.read():
+ pass
def test_proxy_qop_auth_int_works_or_throws_urlerror(self):
self.proxy_digest_handler.add_password(self.REALM, self.URL,
@@ -386,11 +385,11 @@ class ProxyAuthTests(unittest.TestCase):
# It's okay if we don't support auth-int, but we certainly
# shouldn't receive any kind of exception here other than
# a URLError.
- result = None
- if result:
- while result.read():
- pass
- result.close()
+ pass
+ else:
+ with result:
+ while result.read():
+ pass
def GetRequestHandler(responses):
@@ -611,14 +610,11 @@ class TestUrlopen(unittest.TestCase):
def test_basic(self):
handler = self.start_server()
- open_url = urllib.request.urlopen("http://localhost:%s" % handler.port)
- for attr in ("read", "close", "info", "geturl"):
- self.assertTrue(hasattr(open_url, attr), "object returned from "
- "urlopen lacks the %s attribute" % attr)
- try:
+ with urllib.request.urlopen("http://localhost:%s" % handler.port) as open_url:
+ for attr in ("read", "close", "info", "geturl"):
+ self.assertTrue(hasattr(open_url, attr), "object returned from "
+ "urlopen lacks the %s attribute" % attr)
self.assertTrue(open_url.read(), "calling 'read' failed")
- finally:
- open_url.close()
def test_info(self):
handler = self.start_server()
diff --git a/Lib/test/test_xmlrpc.py b/Lib/test/test_xmlrpc.py
index 916e9c4..9c8b695 100644
--- a/Lib/test/test_xmlrpc.py
+++ b/Lib/test/test_xmlrpc.py
@@ -821,10 +821,9 @@ class SimpleServerTestCase(BaseServerTestCase):
def test_404(self):
# send POST with http.client, it should return 404 header and
# 'Not Found' message.
- conn = http.client.HTTPConnection(ADDR, PORT)
- conn.request('POST', '/this-is-not-valid')
- response = conn.getresponse()
- conn.close()
+ with contextlib.closing(http.client.HTTPConnection(ADDR, PORT)) as conn:
+ conn.request('POST', '/this-is-not-valid')
+ response = conn.getresponse()
self.assertEqual(response.status, 404)
self.assertEqual(response.reason, 'Not Found')
@@ -944,9 +943,8 @@ class SimpleServerTestCase(BaseServerTestCase):
def test_partial_post(self):
# Check that a partial POST doesn't make the server loop: issue #14001.
- conn = http.client.HTTPConnection(ADDR, PORT)
- conn.request('POST', '/RPC2 HTTP/1.0\r\nContent-Length: 100\r\n\r\nbye')
- conn.close()
+ with contextlib.closing(http.client.HTTPConnection(ADDR, PORT)) as conn:
+ conn.request('POST', '/RPC2 HTTP/1.0\r\nContent-Length: 100\r\n\r\nbye')
def test_context_manager(self):
with xmlrpclib.ServerProxy(URL) as server:
diff --git a/Lib/test/test_zipimport.py b/Lib/test/test_zipimport.py
index e98b090..d4f619e 100644
--- a/Lib/test/test_zipimport.py
+++ b/Lib/test/test_zipimport.py
@@ -422,57 +422,53 @@ class UncompressedZipImportTestCase(ImportHooksBaseTestCase):
packdir2 + TESTMOD + pyc_ext: (NOW, test_pyc),
"spam" + pyc_ext: (NOW, test_pyc)}
- z = ZipFile(TEMP_ZIP, "w")
- try:
+ self.addCleanup(support.unlink, TEMP_ZIP)
+ with ZipFile(TEMP_ZIP, "w") as z:
for name, (mtime, data) in files.items():
zinfo = ZipInfo(name, time.localtime(mtime))
zinfo.compress_type = self.compression
zinfo.comment = b"spam"
z.writestr(zinfo, data)
- z.close()
-
- zi = zipimport.zipimporter(TEMP_ZIP)
- self.assertEqual(zi.archive, TEMP_ZIP)
- self.assertEqual(zi.is_package(TESTPACK), True)
-
- find_mod = zi.find_module('spam')
- self.assertIsNotNone(find_mod)
- self.assertIsInstance(find_mod, zipimport.zipimporter)
- self.assertFalse(find_mod.is_package('spam'))
- load_mod = find_mod.load_module('spam')
- self.assertEqual(find_mod.get_filename('spam'), load_mod.__file__)
-
- mod = zi.load_module(TESTPACK)
- self.assertEqual(zi.get_filename(TESTPACK), mod.__file__)
-
- existing_pack_path = importlib.import_module(TESTPACK).__path__[0]
- expected_path_path = os.path.join(TEMP_ZIP, TESTPACK)
- self.assertEqual(existing_pack_path, expected_path_path)
-
- self.assertEqual(zi.is_package(packdir + '__init__'), False)
- self.assertEqual(zi.is_package(packdir + TESTPACK2), True)
- self.assertEqual(zi.is_package(packdir2 + TESTMOD), False)
-
- mod_path = packdir2 + TESTMOD
- mod_name = module_path_to_dotted_name(mod_path)
- mod = importlib.import_module(mod_name)
- self.assertTrue(mod_name in sys.modules)
- self.assertEqual(zi.get_source(TESTPACK), None)
- self.assertEqual(zi.get_source(mod_path), None)
- self.assertEqual(zi.get_filename(mod_path), mod.__file__)
- # To pass in the module name instead of the path, we must use the
- # right importer
- loader = mod.__loader__
- self.assertEqual(loader.get_source(mod_name), None)
- self.assertEqual(loader.get_filename(mod_name), mod.__file__)
-
- # test prefix and archivepath members
- zi2 = zipimport.zipimporter(TEMP_ZIP + os.sep + TESTPACK)
- self.assertEqual(zi2.archive, TEMP_ZIP)
- self.assertEqual(zi2.prefix, TESTPACK + os.sep)
- finally:
- z.close()
- os.remove(TEMP_ZIP)
+
+ zi = zipimport.zipimporter(TEMP_ZIP)
+ self.assertEqual(zi.archive, TEMP_ZIP)
+ self.assertEqual(zi.is_package(TESTPACK), True)
+
+ find_mod = zi.find_module('spam')
+ self.assertIsNotNone(find_mod)
+ self.assertIsInstance(find_mod, zipimport.zipimporter)
+ self.assertFalse(find_mod.is_package('spam'))
+ load_mod = find_mod.load_module('spam')
+ self.assertEqual(find_mod.get_filename('spam'), load_mod.__file__)
+
+ mod = zi.load_module(TESTPACK)
+ self.assertEqual(zi.get_filename(TESTPACK), mod.__file__)
+
+ existing_pack_path = importlib.import_module(TESTPACK).__path__[0]
+ expected_path_path = os.path.join(TEMP_ZIP, TESTPACK)
+ self.assertEqual(existing_pack_path, expected_path_path)
+
+ self.assertEqual(zi.is_package(packdir + '__init__'), False)
+ self.assertEqual(zi.is_package(packdir + TESTPACK2), True)
+ self.assertEqual(zi.is_package(packdir2 + TESTMOD), False)
+
+ mod_path = packdir2 + TESTMOD
+ mod_name = module_path_to_dotted_name(mod_path)
+ mod = importlib.import_module(mod_name)
+ self.assertTrue(mod_name in sys.modules)
+ self.assertEqual(zi.get_source(TESTPACK), None)
+ self.assertEqual(zi.get_source(mod_path), None)
+ self.assertEqual(zi.get_filename(mod_path), mod.__file__)
+ # To pass in the module name instead of the path, we must use the
+ # right importer
+ loader = mod.__loader__
+ self.assertEqual(loader.get_source(mod_name), None)
+ self.assertEqual(loader.get_filename(mod_name), mod.__file__)
+
+ # test prefix and archivepath members
+ zi2 = zipimport.zipimporter(TEMP_ZIP + os.sep + TESTPACK)
+ self.assertEqual(zi2.archive, TEMP_ZIP)
+ self.assertEqual(zi2.prefix, TESTPACK + os.sep)
def testZipImporterMethodsInSubDirectory(self):
packdir = TESTPACK + os.sep
@@ -480,67 +476,60 @@ class UncompressedZipImportTestCase(ImportHooksBaseTestCase):
files = {packdir2 + "__init__" + pyc_ext: (NOW, test_pyc),
packdir2 + TESTMOD + pyc_ext: (NOW, test_pyc)}
- z = ZipFile(TEMP_ZIP, "w")
- try:
+ self.addCleanup(support.unlink, TEMP_ZIP)
+ with ZipFile(TEMP_ZIP, "w") as z:
for name, (mtime, data) in files.items():
zinfo = ZipInfo(name, time.localtime(mtime))
zinfo.compress_type = self.compression
zinfo.comment = b"eggs"
z.writestr(zinfo, data)
- z.close()
-
- zi = zipimport.zipimporter(TEMP_ZIP + os.sep + packdir)
- self.assertEqual(zi.archive, TEMP_ZIP)
- self.assertEqual(zi.prefix, packdir)
- self.assertEqual(zi.is_package(TESTPACK2), True)
- mod = zi.load_module(TESTPACK2)
- self.assertEqual(zi.get_filename(TESTPACK2), mod.__file__)
-
- self.assertEqual(
- zi.is_package(TESTPACK2 + os.sep + '__init__'), False)
- self.assertEqual(
- zi.is_package(TESTPACK2 + os.sep + TESTMOD), False)
-
- pkg_path = TEMP_ZIP + os.sep + packdir + TESTPACK2
- zi2 = zipimport.zipimporter(pkg_path)
- find_mod_dotted = zi2.find_module(TESTMOD)
- self.assertIsNotNone(find_mod_dotted)
- self.assertIsInstance(find_mod_dotted, zipimport.zipimporter)
- self.assertFalse(zi2.is_package(TESTMOD))
- load_mod = find_mod_dotted.load_module(TESTMOD)
- self.assertEqual(
- find_mod_dotted.get_filename(TESTMOD), load_mod.__file__)
-
- mod_path = TESTPACK2 + os.sep + TESTMOD
- mod_name = module_path_to_dotted_name(mod_path)
- mod = importlib.import_module(mod_name)
- self.assertTrue(mod_name in sys.modules)
- self.assertEqual(zi.get_source(TESTPACK2), None)
- self.assertEqual(zi.get_source(mod_path), None)
- self.assertEqual(zi.get_filename(mod_path), mod.__file__)
- # To pass in the module name instead of the path, we must use the
- # right importer.
- loader = mod.__loader__
- self.assertEqual(loader.get_source(mod_name), None)
- self.assertEqual(loader.get_filename(mod_name), mod.__file__)
- finally:
- z.close()
- os.remove(TEMP_ZIP)
+
+ zi = zipimport.zipimporter(TEMP_ZIP + os.sep + packdir)
+ self.assertEqual(zi.archive, TEMP_ZIP)
+ self.assertEqual(zi.prefix, packdir)
+ self.assertEqual(zi.is_package(TESTPACK2), True)
+ mod = zi.load_module(TESTPACK2)
+ self.assertEqual(zi.get_filename(TESTPACK2), mod.__file__)
+
+ self.assertEqual(
+ zi.is_package(TESTPACK2 + os.sep + '__init__'), False)
+ self.assertEqual(
+ zi.is_package(TESTPACK2 + os.sep + TESTMOD), False)
+
+ pkg_path = TEMP_ZIP + os.sep + packdir + TESTPACK2
+ zi2 = zipimport.zipimporter(pkg_path)
+ find_mod_dotted = zi2.find_module(TESTMOD)
+ self.assertIsNotNone(find_mod_dotted)
+ self.assertIsInstance(find_mod_dotted, zipimport.zipimporter)
+ self.assertFalse(zi2.is_package(TESTMOD))
+ load_mod = find_mod_dotted.load_module(TESTMOD)
+ self.assertEqual(
+ find_mod_dotted.get_filename(TESTMOD), load_mod.__file__)
+
+ mod_path = TESTPACK2 + os.sep + TESTMOD
+ mod_name = module_path_to_dotted_name(mod_path)
+ mod = importlib.import_module(mod_name)
+ self.assertTrue(mod_name in sys.modules)
+ self.assertEqual(zi.get_source(TESTPACK2), None)
+ self.assertEqual(zi.get_source(mod_path), None)
+ self.assertEqual(zi.get_filename(mod_path), mod.__file__)
+ # To pass in the module name instead of the path, we must use the
+ # right importer.
+ loader = mod.__loader__
+ self.assertEqual(loader.get_source(mod_name), None)
+ self.assertEqual(loader.get_filename(mod_name), mod.__file__)
def testGetData(self):
- z = ZipFile(TEMP_ZIP, "w")
- z.compression = self.compression
- try:
+ self.addCleanup(support.unlink, TEMP_ZIP)
+ with ZipFile(TEMP_ZIP, "w") as z:
+ z.compression = self.compression
name = "testdata.dat"
data = bytes(x for x in range(256))
z.writestr(name, data)
- z.close()
- zi = zipimport.zipimporter(TEMP_ZIP)
- self.assertEqual(data, zi.get_data(name))
- self.assertIn('zipimporter object', repr(zi))
- finally:
- z.close()
- os.remove(TEMP_ZIP)
+
+ zi = zipimport.zipimporter(TEMP_ZIP)
+ self.assertEqual(data, zi.get_data(name))
+ self.assertIn('zipimporter object', repr(zi))
def testImporterAttr(self):
src = """if 1: # indent hack
@@ -643,15 +632,12 @@ class UncompressedZipImportTestCase(ImportHooksBaseTestCase):
"need an unencodable filename")
def testUnencodable(self):
filename = support.TESTFN_UNENCODABLE + ".zip"
- z = ZipFile(filename, "w")
- zinfo = ZipInfo(TESTMOD + ".py", time.localtime(NOW))
- zinfo.compress_type = self.compression
- z.writestr(zinfo, test_src)
- z.close()
- try:
- zipimport.zipimporter(filename).load_module(TESTMOD)
- finally:
- os.remove(filename)
+ self.addCleanup(support.unlink, filename)
+ with ZipFile(filename, "w") as z:
+ zinfo = ZipInfo(TESTMOD + ".py", time.localtime(NOW))
+ zinfo.compress_type = self.compression
+ z.writestr(zinfo, test_src)
+ zipimport.zipimporter(filename).load_module(TESTMOD)
def testBytesPath(self):
filename = support.TESTFN + ".zip"
diff --git a/Lib/test/test_zipimport_support.py b/Lib/test/test_zipimport_support.py
index 84d526c..8856101 100644
--- a/Lib/test/test_zipimport_support.py
+++ b/Lib/test/test_zipimport_support.py
@@ -122,15 +122,13 @@ class ZipSupportTests(unittest.TestCase):
test_src)
zip_name, run_name = make_zip_script(d, 'test_zip',
script_name)
- z = zipfile.ZipFile(zip_name, 'a')
- for mod_name, src in sample_sources.items():
- z.writestr(mod_name + ".py", src)
- z.close()
+ with zipfile.ZipFile(zip_name, 'a') as z:
+ for mod_name, src in sample_sources.items():
+ z.writestr(mod_name + ".py", src)
if verbose:
- zip_file = zipfile.ZipFile(zip_name, 'r')
- print ('Contents of %r:' % zip_name)
- zip_file.printdir()
- zip_file.close()
+ with zipfile.ZipFile(zip_name, 'r') as zip_file:
+ print ('Contents of %r:' % zip_name)
+ zip_file.printdir()
os.remove(script_name)
sys.path.insert(0, zip_name)
import test_zipped_doctest