diff options
author | Serhiy Storchaka <storchaka@gmail.com> | 2019-03-05 08:06:26 (GMT) |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-03-05 08:06:26 (GMT) |
commit | 5b10b9824780b2181158902067912ee9e7b04657 (patch) | |
tree | 1c89bea944e6638eb008c8f106b2ee48cc9448d1 /Lib/test/support | |
parent | 9e4861f52349011cd5916eef8e8344575e8ac426 (diff) | |
download | cpython-5b10b9824780b2181158902067912ee9e7b04657.zip cpython-5b10b9824780b2181158902067912ee9e7b04657.tar.gz cpython-5b10b9824780b2181158902067912ee9e7b04657.tar.bz2 |
bpo-22831: Use "with" to avoid possible fd leaks in tests (part 2). (GH-10929)
Diffstat (limited to 'Lib/test/support')
-rw-r--r-- | Lib/test/support/__init__.py | 14 | ||||
-rw-r--r-- | Lib/test/support/script_helper.py | 51 |
2 files changed, 30 insertions, 35 deletions
diff --git a/Lib/test/support/__init__.py b/Lib/test/support/__init__.py index c0938d9..5bd15a2 100644 --- a/Lib/test/support/__init__.py +++ b/Lib/test/support/__init__.py @@ -706,9 +706,8 @@ def find_unused_port(family=socket.AF_INET, socktype=socket.SOCK_STREAM): issue if/when we come across it. """ - tempsock = socket.socket(family, socktype) - port = bind_port(tempsock) - tempsock.close() + with socket.socket(family, socktype) as tempsock: + port = bind_port(tempsock) del tempsock return port @@ -1785,10 +1784,11 @@ class _MemoryWatchdog: sys.stderr.flush() return - watchdog_script = findfile("memory_watchdog.py") - self.mem_watchdog = subprocess.Popen([sys.executable, watchdog_script], - stdin=f, stderr=subprocess.DEVNULL) - f.close() + with f: + watchdog_script = findfile("memory_watchdog.py") + self.mem_watchdog = subprocess.Popen([sys.executable, watchdog_script], + stdin=f, + stderr=subprocess.DEVNULL) self.started = True def stop(self): diff --git a/Lib/test/support/script_helper.py b/Lib/test/support/script_helper.py index 64b25aa..27a47f2 100644 --- a/Lib/test/support/script_helper.py +++ b/Lib/test/support/script_helper.py @@ -205,31 +205,28 @@ def make_script(script_dir, script_basename, source, omit_suffix=False): script_filename += os.extsep + 'py' script_name = os.path.join(script_dir, script_filename) # The script should be encoded to UTF-8, the default string encoding - script_file = open(script_name, 'w', encoding='utf-8') - script_file.write(source) - script_file.close() + with open(script_name, 'w', encoding='utf-8') as script_file: + script_file.write(source) importlib.invalidate_caches() return script_name def make_zip_script(zip_dir, zip_basename, script_name, name_in_zip=None): zip_filename = zip_basename+os.extsep+'zip' zip_name = os.path.join(zip_dir, zip_filename) - zip_file = zipfile.ZipFile(zip_name, 'w') - if name_in_zip is None: - parts = script_name.split(os.sep) - if len(parts) >= 2 and parts[-2] == '__pycache__': - legacy_pyc = make_legacy_pyc(source_from_cache(script_name)) - name_in_zip = os.path.basename(legacy_pyc) - script_name = legacy_pyc - else: - name_in_zip = os.path.basename(script_name) - zip_file.write(script_name, name_in_zip) - zip_file.close() + with zipfile.ZipFile(zip_name, 'w') as zip_file: + if name_in_zip is None: + parts = script_name.split(os.sep) + if len(parts) >= 2 and parts[-2] == '__pycache__': + legacy_pyc = make_legacy_pyc(source_from_cache(script_name)) + name_in_zip = os.path.basename(legacy_pyc) + script_name = legacy_pyc + else: + name_in_zip = os.path.basename(script_name) + zip_file.write(script_name, name_in_zip) #if test.support.verbose: - # zip_file = zipfile.ZipFile(zip_name, 'r') - # print 'Contents of %r:' % zip_name - # zip_file.printdir() - # zip_file.close() + # with zipfile.ZipFile(zip_name, 'r') as zip_file: + # print 'Contents of %r:' % zip_name + # zip_file.printdir() return zip_name, os.path.join(zip_name, name_in_zip) def make_pkg(pkg_dir, init_source=''): @@ -252,17 +249,15 @@ def make_zip_pkg(zip_dir, zip_basename, pkg_name, script_basename, script_name_in_zip = os.path.join(pkg_names[-1], os.path.basename(script_name)) zip_filename = zip_basename+os.extsep+'zip' zip_name = os.path.join(zip_dir, zip_filename) - zip_file = zipfile.ZipFile(zip_name, 'w') - for name in pkg_names: - init_name_in_zip = os.path.join(name, init_basename) - zip_file.write(init_name, init_name_in_zip) - zip_file.write(script_name, script_name_in_zip) - zip_file.close() + with zipfile.ZipFile(zip_name, 'w') as zip_file: + for name in pkg_names: + init_name_in_zip = os.path.join(name, init_basename) + zip_file.write(init_name, init_name_in_zip) + zip_file.write(script_name, script_name_in_zip) for name in unlink: os.unlink(name) #if test.support.verbose: - # zip_file = zipfile.ZipFile(zip_name, 'r') - # print 'Contents of %r:' % zip_name - # zip_file.printdir() - # zip_file.close() + # with zipfile.ZipFile(zip_name, 'r') as zip_file: + # print 'Contents of %r:' % zip_name + # zip_file.printdir() return zip_name, os.path.join(zip_name, script_name_in_zip) |