summaryrefslogtreecommitdiffstats
path: root/Lib/test/support
diff options
context:
space:
mode:
authorSerhiy Storchaka <storchaka@gmail.com>2019-03-05 08:06:26 (GMT)
committerGitHub <noreply@github.com>2019-03-05 08:06:26 (GMT)
commit5b10b9824780b2181158902067912ee9e7b04657 (patch)
tree1c89bea944e6638eb008c8f106b2ee48cc9448d1 /Lib/test/support
parent9e4861f52349011cd5916eef8e8344575e8ac426 (diff)
downloadcpython-5b10b9824780b2181158902067912ee9e7b04657.zip
cpython-5b10b9824780b2181158902067912ee9e7b04657.tar.gz
cpython-5b10b9824780b2181158902067912ee9e7b04657.tar.bz2
bpo-22831: Use "with" to avoid possible fd leaks in tests (part 2). (GH-10929)
Diffstat (limited to 'Lib/test/support')
-rw-r--r--Lib/test/support/__init__.py14
-rw-r--r--Lib/test/support/script_helper.py51
2 files changed, 30 insertions, 35 deletions
diff --git a/Lib/test/support/__init__.py b/Lib/test/support/__init__.py
index c0938d9..5bd15a2 100644
--- a/Lib/test/support/__init__.py
+++ b/Lib/test/support/__init__.py
@@ -706,9 +706,8 @@ def find_unused_port(family=socket.AF_INET, socktype=socket.SOCK_STREAM):
issue if/when we come across it.
"""
- tempsock = socket.socket(family, socktype)
- port = bind_port(tempsock)
- tempsock.close()
+ with socket.socket(family, socktype) as tempsock:
+ port = bind_port(tempsock)
del tempsock
return port
@@ -1785,10 +1784,11 @@ class _MemoryWatchdog:
sys.stderr.flush()
return
- watchdog_script = findfile("memory_watchdog.py")
- self.mem_watchdog = subprocess.Popen([sys.executable, watchdog_script],
- stdin=f, stderr=subprocess.DEVNULL)
- f.close()
+ with f:
+ watchdog_script = findfile("memory_watchdog.py")
+ self.mem_watchdog = subprocess.Popen([sys.executable, watchdog_script],
+ stdin=f,
+ stderr=subprocess.DEVNULL)
self.started = True
def stop(self):
diff --git a/Lib/test/support/script_helper.py b/Lib/test/support/script_helper.py
index 64b25aa..27a47f2 100644
--- a/Lib/test/support/script_helper.py
+++ b/Lib/test/support/script_helper.py
@@ -205,31 +205,28 @@ def make_script(script_dir, script_basename, source, omit_suffix=False):
script_filename += os.extsep + 'py'
script_name = os.path.join(script_dir, script_filename)
# The script should be encoded to UTF-8, the default string encoding
- script_file = open(script_name, 'w', encoding='utf-8')
- script_file.write(source)
- script_file.close()
+ with open(script_name, 'w', encoding='utf-8') as script_file:
+ script_file.write(source)
importlib.invalidate_caches()
return script_name
def make_zip_script(zip_dir, zip_basename, script_name, name_in_zip=None):
zip_filename = zip_basename+os.extsep+'zip'
zip_name = os.path.join(zip_dir, zip_filename)
- zip_file = zipfile.ZipFile(zip_name, 'w')
- if name_in_zip is None:
- parts = script_name.split(os.sep)
- if len(parts) >= 2 and parts[-2] == '__pycache__':
- legacy_pyc = make_legacy_pyc(source_from_cache(script_name))
- name_in_zip = os.path.basename(legacy_pyc)
- script_name = legacy_pyc
- else:
- name_in_zip = os.path.basename(script_name)
- zip_file.write(script_name, name_in_zip)
- zip_file.close()
+ with zipfile.ZipFile(zip_name, 'w') as zip_file:
+ if name_in_zip is None:
+ parts = script_name.split(os.sep)
+ if len(parts) >= 2 and parts[-2] == '__pycache__':
+ legacy_pyc = make_legacy_pyc(source_from_cache(script_name))
+ name_in_zip = os.path.basename(legacy_pyc)
+ script_name = legacy_pyc
+ else:
+ name_in_zip = os.path.basename(script_name)
+ zip_file.write(script_name, name_in_zip)
#if test.support.verbose:
- # zip_file = zipfile.ZipFile(zip_name, 'r')
- # print 'Contents of %r:' % zip_name
- # zip_file.printdir()
- # zip_file.close()
+ # with zipfile.ZipFile(zip_name, 'r') as zip_file:
+ # print 'Contents of %r:' % zip_name
+ # zip_file.printdir()
return zip_name, os.path.join(zip_name, name_in_zip)
def make_pkg(pkg_dir, init_source=''):
@@ -252,17 +249,15 @@ def make_zip_pkg(zip_dir, zip_basename, pkg_name, script_basename,
script_name_in_zip = os.path.join(pkg_names[-1], os.path.basename(script_name))
zip_filename = zip_basename+os.extsep+'zip'
zip_name = os.path.join(zip_dir, zip_filename)
- zip_file = zipfile.ZipFile(zip_name, 'w')
- for name in pkg_names:
- init_name_in_zip = os.path.join(name, init_basename)
- zip_file.write(init_name, init_name_in_zip)
- zip_file.write(script_name, script_name_in_zip)
- zip_file.close()
+ with zipfile.ZipFile(zip_name, 'w') as zip_file:
+ for name in pkg_names:
+ init_name_in_zip = os.path.join(name, init_basename)
+ zip_file.write(init_name, init_name_in_zip)
+ zip_file.write(script_name, script_name_in_zip)
for name in unlink:
os.unlink(name)
#if test.support.verbose:
- # zip_file = zipfile.ZipFile(zip_name, 'r')
- # print 'Contents of %r:' % zip_name
- # zip_file.printdir()
- # zip_file.close()
+ # with zipfile.ZipFile(zip_name, 'r') as zip_file:
+ # print 'Contents of %r:' % zip_name
+ # zip_file.printdir()
return zip_name, os.path.join(zip_name, script_name_in_zip)