summaryrefslogtreecommitdiffstats
path: root/Lib
diff options
context:
space:
mode:
Diffstat (limited to 'Lib')
-rw-r--r--Lib/distutils/command/bdist.py5
-rw-r--r--Lib/distutils/command/bdist_msi.py20
-rw-r--r--Lib/distutils/command/bdist_wininst.py28
-rw-r--r--Lib/distutils/command/build.py18
-rw-r--r--Lib/distutils/command/build_ext.py30
-rw-r--r--Lib/distutils/command/install.py9
-rw-r--r--Lib/distutils/command/wininst-9.0-amd64.exebin0 -> 76288 bytes
-rw-r--r--Lib/distutils/msvc9compiler.py72
-rw-r--r--Lib/distutils/msvccompiler.py2
-rw-r--r--Lib/distutils/util.py4
-rw-r--r--Lib/inspect.py2
-rw-r--r--Lib/sre_compile.py3
-rw-r--r--Lib/test/test_asynchat.py37
-rw-r--r--Lib/test/test_asyncore.py30
-rw-r--r--Lib/test/test_capi.py5
-rw-r--r--Lib/test/test_ftplib.py31
-rw-r--r--Lib/test/test_httplib.py22
-rw-r--r--Lib/test/test_poplib.py24
-rw-r--r--Lib/test/test_signal.py19
-rw-r--r--Lib/test/test_smtplib.py78
-rw-r--r--Lib/test/test_socket.py65
-rw-r--r--Lib/test/test_socketserver.py2
-rw-r--r--Lib/test/test_ssl.py194
-rw-r--r--Lib/test/test_sundry.py210
-rw-r--r--Lib/test/test_support.py120
-rw-r--r--Lib/test/test_telnetlib.py24
-rw-r--r--Lib/test/test_zlib.py5
-rw-r--r--Lib/types.py14
28 files changed, 575 insertions, 498 deletions
diff --git a/Lib/distutils/command/bdist.py b/Lib/distutils/command/bdist.py
index 69c1b28..e3b047c 100644
--- a/Lib/distutils/command/bdist.py
+++ b/Lib/distutils/command/bdist.py
@@ -91,7 +91,10 @@ class bdist(Command):
def finalize_options(self):
# have to finalize 'plat_name' before 'bdist_base'
if self.plat_name is None:
- self.plat_name = get_platform()
+ if self.skip_build:
+ self.plat_name = get_platform()
+ else:
+ self.plat_name = self.get_finalized_command('build').plat_name
# 'bdist_base' -- parent of per-built-distribution-format
# temporary directories (eg. we'll probably have
diff --git a/Lib/distutils/command/bdist_msi.py b/Lib/distutils/command/bdist_msi.py
index d313a50..aab89cd 100644
--- a/Lib/distutils/command/bdist_msi.py
+++ b/Lib/distutils/command/bdist_msi.py
@@ -9,11 +9,11 @@ Implements the bdist_msi command.
import sys, os
from distutils.core import Command
-from distutils.util import get_platform
from distutils.dir_util import remove_tree
from distutils.sysconfig import get_python_version
from distutils.version import StrictVersion
from distutils.errors import DistutilsOptionError
+from distutils.util import get_platform
from distutils import log
import msilib
from msilib import schema, sequence, text
@@ -87,6 +87,9 @@ class bdist_msi(Command):
user_options = [('bdist-dir=', None,
"temporary directory for creating the distribution"),
+ ('plat-name=', 'p',
+ "platform name to embed in generated filenames "
+ "(default: %s)" % get_platform()),
('keep-temp', 'k',
"keep the pseudo-installation tree around after " +
"creating the distribution archive"),
@@ -116,6 +119,7 @@ class bdist_msi(Command):
def initialize_options(self):
self.bdist_dir = None
+ self.plat_name = None
self.keep_temp = 0
self.no_target_compile = 0
self.no_target_optimize = 0
@@ -139,7 +143,10 @@ class bdist_msi(Command):
else:
self.target_version = short_version
- self.set_undefined_options('bdist', ('dist_dir', 'dist_dir'))
+ self.set_undefined_options('bdist',
+ ('dist_dir', 'dist_dir'),
+ ('plat_name', 'plat_name'),
+ )
if self.pre_install_script:
raise DistutilsOptionError(
@@ -180,7 +187,7 @@ class bdist_msi(Command):
if not target_version:
assert self.skip_build, "Should have already checked this"
target_version = sys.version[0:3]
- plat_specifier = ".%s-%s" % (get_platform(), target_version)
+ plat_specifier = ".%s-%s" % (self.plat_name, target_version)
build = self.get_finalized_command('build')
build.build_lib = os.path.join(build.build_base,
'lib' + plat_specifier)
@@ -633,8 +640,7 @@ class bdist_msi(Command):
def get_installer_filename(self, fullname):
# Factored out to allow overriding in subclasses
- plat = get_platform()
- installer_name = os.path.join(self.dist_dir,
- "%s.%s-py%s.msi" %
- (fullname, plat, self.target_version))
+ base_name = "%s.%s-py%s.msi" % (fullname, self.plat_name,
+ self.target_version)
+ installer_name = os.path.join(self.dist_dir, base_name)
return installer_name
diff --git a/Lib/distutils/command/bdist_wininst.py b/Lib/distutils/command/bdist_wininst.py
index 2d75a38..bf8d022 100644
--- a/Lib/distutils/command/bdist_wininst.py
+++ b/Lib/distutils/command/bdist_wininst.py
@@ -19,6 +19,9 @@ class bdist_wininst(Command):
user_options = [('bdist-dir=', None,
"temporary directory for creating the distribution"),
+ ('plat-name=', 'p',
+ "platform name to embed in generated filenames "
+ "(default: %s)" % get_platform()),
('keep-temp', 'k',
"keep the pseudo-installation tree around after " +
"creating the distribution archive"),
@@ -52,6 +55,7 @@ class bdist_wininst(Command):
def initialize_options(self):
self.bdist_dir = None
+ self.plat_name = None
self.keep_temp = 0
self.no_target_compile = 0
self.no_target_optimize = 0
@@ -78,7 +82,10 @@ class bdist_wininst(Command):
" option must be specified" % (short_version,))
self.target_version = short_version
- self.set_undefined_options('bdist', ('dist_dir', 'dist_dir'))
+ self.set_undefined_options('bdist',
+ ('dist_dir', 'dist_dir'),
+ ('plat_name', 'plat_name'),
+ )
if self.install_script:
for script in self.distribution.scripts:
@@ -104,6 +111,7 @@ class bdist_wininst(Command):
install.root = self.bdist_dir
install.skip_build = self.skip_build
install.warn_dir = 0
+ install.plat_name = self.plat_name
install_lib = self.reinitialize_command('install_lib')
# we do not want to include pyc or pyo files
@@ -121,7 +129,7 @@ class bdist_wininst(Command):
if not target_version:
assert self.skip_build, "Should have already checked this"
target_version = sys.version[0:3]
- plat_specifier = ".%s-%s" % (get_platform(), target_version)
+ plat_specifier = ".%s-%s" % (self.plat_name, target_version)
build = self.get_finalized_command('build')
build.build_lib = os.path.join(build.build_base,
'lib' + plat_specifier)
@@ -267,11 +275,11 @@ class bdist_wininst(Command):
# if we create an installer for a specific python version,
# it's better to include this in the name
installer_name = os.path.join(self.dist_dir,
- "%s.win32-py%s.exe" %
- (fullname, self.target_version))
+ "%s.%s-py%s.exe" %
+ (fullname, self.plat_name, self.target_version))
else:
installer_name = os.path.join(self.dist_dir,
- "%s.win32.exe" % fullname)
+ "%s.%s.exe" % (fullname, self.plat_name))
return installer_name
def get_exe_bytes(self):
@@ -293,9 +301,9 @@ class bdist_wininst(Command):
bv = get_build_version()
else:
if self.target_version < "2.4":
- bv = "6"
+ bv = 6.0
else:
- bv = "7.1"
+ bv = 7.1
else:
# for current version - use authoritative check.
bv = get_build_version()
@@ -304,5 +312,9 @@ class bdist_wininst(Command):
directory = os.path.dirname(__file__)
# we must use a wininst-x.y.exe built with the same C compiler
# used for python. XXX What about mingw, borland, and so on?
- filename = os.path.join(directory, "wininst-%.1f.exe" % bv)
+ if self.plat_name == 'win32':
+ sfix = ''
+ else:
+ sfix = self.plat_name[3:] # strip 'win' - leaves eg '-amd64'
+ filename = os.path.join(directory, "wininst-%.1f%s.exe" % (bv, sfix))
return open(filename, "rb").read()
diff --git a/Lib/distutils/command/build.py b/Lib/distutils/command/build.py
index 4fe95b0..9c2667c 100644
--- a/Lib/distutils/command/build.py
+++ b/Lib/distutils/command/build.py
@@ -6,6 +6,7 @@ __revision__ = "$Id$"
import sys, os
from distutils.core import Command
+from distutils.errors import DistutilsOptionError
from distutils.util import get_platform
@@ -32,6 +33,9 @@ class build(Command):
"build directory for scripts"),
('build-temp=', 't',
"temporary build directory"),
+ ('plat-name=', 'p',
+ "platform name to build for, if supported "
+ "(default: %s)" % get_platform()),
('compiler=', 'c',
"specify the compiler type"),
('debug', 'g',
@@ -59,12 +63,24 @@ class build(Command):
self.build_temp = None
self.build_scripts = None
self.compiler = None
+ self.plat_name = None
self.debug = None
self.force = 0
self.executable = None
def finalize_options(self):
- plat_specifier = ".%s-%s" % (get_platform(), sys.version[0:3])
+ if self.plat_name is None:
+ self.plat_name = get_platform()
+ else:
+ # plat-name only supported for windows (other platforms are
+ # supported via ./configure flags, if at all). Avoid misleading
+ # other platforms.
+ if os.name != 'nt':
+ raise DistutilsOptionError(
+ "--plat-name only supported on Windows (try "
+ "using './configure --help' on your platform)")
+
+ plat_specifier = ".%s-%s" % (self.plat_name, sys.version[0:3])
# Make it so Python 2.x and Python 2.x with --with-pydebug don't
# share the same build directories. Doing so confuses the build
diff --git a/Lib/distutils/command/build_ext.py b/Lib/distutils/command/build_ext.py
index ff84bca..e011219 100644
--- a/Lib/distutils/command/build_ext.py
+++ b/Lib/distutils/command/build_ext.py
@@ -12,6 +12,7 @@ from distutils.errors import *
from distutils.sysconfig import customize_compiler, get_python_version
from distutils.dep_util import newer_group
from distutils.extension import Extension
+from distutils.util import get_platform
from distutils import log
if os.name == 'nt':
@@ -57,6 +58,9 @@ class build_ext(Command):
"directory for compiled extension modules"),
('build-temp=', 't',
"directory for temporary files (build by-products)"),
+ ('plat-name=', 'p',
+ "platform name to cross-compile for, if supported "
+ "(default: %s)" % get_platform()),
('inplace', 'i',
"ignore build-lib and put compiled extensions into the source " +
"directory alongside your pure Python modules"),
@@ -98,6 +102,7 @@ class build_ext(Command):
def initialize_options(self):
self.extensions = None
self.build_lib = None
+ self.plat_name = None
self.build_temp = None
self.inplace = 0
self.package = None
@@ -124,7 +129,9 @@ class build_ext(Command):
('build_temp', 'build_temp'),
('compiler', 'compiler'),
('debug', 'debug'),
- ('force', 'force'))
+ ('force', 'force'),
+ ('plat_name', 'plat_name'),
+ )
if self.package is None:
self.package = self.distribution.ext_package
@@ -167,6 +174,9 @@ class build_ext(Command):
# for Release and Debug builds.
# also Python's library directory must be appended to library_dirs
if os.name == 'nt':
+ # the 'libs' directory is for binary installs - we assume that
+ # must be the *native* platform. But we don't really support
+ # cross-compiling via a binary install anyway, so we let it go.
self.library_dirs.append(os.path.join(sys.exec_prefix, 'libs'))
if self.debug:
self.build_temp = os.path.join(self.build_temp, "Debug")
@@ -177,8 +187,17 @@ class build_ext(Command):
# this allows distutils on windows to work in the source tree
self.include_dirs.append(os.path.join(sys.exec_prefix, 'PC'))
if MSVC_VERSION == 9:
- self.library_dirs.append(os.path.join(sys.exec_prefix,
- 'PCbuild'))
+ # Use the .lib files for the correct architecture
+ if self.plat_name == 'win32':
+ suffix = ''
+ else:
+ # win-amd64 or win-ia64
+ suffix = self.plat_name[4:]
+ new_lib = os.path.join(sys.exec_prefix, 'PCbuild')
+ if suffix:
+ new_lib = os.path.join(new_lib, suffix)
+ self.library_dirs.append(new_lib)
+
elif MSVC_VERSION == 8:
self.library_dirs.append(os.path.join(sys.exec_prefix,
'PC', 'VS8.0', 'win32release'))
@@ -267,6 +286,11 @@ class build_ext(Command):
dry_run=self.dry_run,
force=self.force)
customize_compiler(self.compiler)
+ # If we are cross-compiling, init the compiler now (if we are not
+ # cross-compiling, init would not hurt, but people may rely on
+ # late initialization of compiler even if they shouldn't...)
+ if os.name == 'nt' and self.plat_name != get_platform():
+ self.compiler.initialize(self.plat_name)
# And make sure that any compile/link-related options (which might
# come from the command-line or from the setup script) are set in
diff --git a/Lib/distutils/command/install.py b/Lib/distutils/command/install.py
index e4ee680..50884c3 100644
--- a/Lib/distutils/command/install.py
+++ b/Lib/distutils/command/install.py
@@ -13,6 +13,7 @@ from distutils.sysconfig import get_config_vars
from distutils.errors import DistutilsPlatformError
from distutils.file_util import write_file
from distutils.util import convert_path, subst_vars, change_root
+from distutils.util import get_platform
from distutils.errors import DistutilsOptionError
if sys.version < "2.2":
@@ -485,6 +486,14 @@ class install (Command):
# Obviously have to build before we can install
if not self.skip_build:
self.run_command('build')
+ # If we built for any other platform, we can't install.
+ build_plat = self.distribution.get_command_obj('build').plat_name
+ # check warn_dir - it is a clue that the 'install' is happening
+ # internally, and not to sys.path, so we don't check the platform
+ # matches what we are running.
+ if self.warn_dir and build_plat != get_platform():
+ raise DistutilsPlatformError("Can't install when "
+ "cross-compiling")
# Run all sub-commands (at least those that need to be run)
for cmd_name in self.get_sub_commands():
diff --git a/Lib/distutils/command/wininst-9.0-amd64.exe b/Lib/distutils/command/wininst-9.0-amd64.exe
new file mode 100644
index 0000000..c99ede4
--- /dev/null
+++ b/Lib/distutils/command/wininst-9.0-amd64.exe
Binary files differ
diff --git a/Lib/distutils/msvc9compiler.py b/Lib/distutils/msvc9compiler.py
index 828d7fb..8b1cf9a 100644
--- a/Lib/distutils/msvc9compiler.py
+++ b/Lib/distutils/msvc9compiler.py
@@ -22,6 +22,7 @@ from distutils.errors import (DistutilsExecError, DistutilsPlatformError,
from distutils.ccompiler import (CCompiler, gen_preprocess_options,
gen_lib_options)
from distutils import log
+from distutils.util import get_platform
import _winreg
@@ -38,13 +39,15 @@ HKEYS = (_winreg.HKEY_USERS,
VS_BASE = r"Software\Microsoft\VisualStudio\%0.1f"
WINSDK_BASE = r"Software\Microsoft\Microsoft SDKs\Windows"
NET_BASE = r"Software\Microsoft\.NETFramework"
-ARCHS = {'DEFAULT' : 'x86',
- 'intel' : 'x86', 'x86' : 'x86',
- 'amd64' : 'x64', 'x64' : 'x64',
- 'itanium' : 'ia64', 'ia64' : 'ia64',
- }
-# The globals VERSION, ARCH, MACROS and VC_ENV are defined later
+# A map keyed by get_platform() return values to values accepted by
+# 'vcvarsall.bat'. Note a cross-compile may combine these (eg, 'x86_amd64' is
+# the param to cross-compile on x86 targetting amd64.)
+PLAT_TO_VCVARS = {
+ 'win32' : 'x86',
+ 'win-amd64' : 'amd64',
+ 'win-ia64' : 'ia64',
+}
class Reg:
"""Helper class to read values from the registry
@@ -176,23 +179,6 @@ def get_build_version():
# else we don't know what version of the compiler this is
return None
-def get_build_architecture():
- """Return the processor architecture.
-
- Possible results are "x86" or "amd64".
- """
- prefix = " bit ("
- i = sys.version.find(prefix)
- if i == -1:
- return "x86"
- j = sys.version.find(")", i)
- sysarch = sys.version[i+len(prefix):j].lower()
- arch = ARCHS.get(sysarch, None)
- if arch is None:
- return ARCHS['DEFAULT']
- else:
- return arch
-
def normalize_and_reduce_paths(paths):
"""Return a list of normalized paths with duplicates removed.
@@ -251,6 +237,7 @@ def query_vcvarsall(version, arch="x86"):
if vcvarsall is None:
raise IOError("Unable to find vcvarsall.bat")
+ log.debug("Calling 'vcvarsall.bat %s' (version=%s)", arch, version)
popen = subprocess.Popen('"%s" %s & set' % (vcvarsall, arch),
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
@@ -281,9 +268,7 @@ def query_vcvarsall(version, arch="x86"):
VERSION = get_build_version()
if VERSION < 8.0:
raise DistutilsPlatformError("VC %0.1f is not supported by this module" % VERSION)
-ARCH = get_build_architecture()
# MACROS = MacroExpander(VERSION)
-VC_ENV = query_vcvarsall(VERSION, ARCH)
class MSVCCompiler(CCompiler) :
"""Concrete class that implements an interface to Microsoft Visual C++,
@@ -318,13 +303,25 @@ class MSVCCompiler(CCompiler) :
def __init__(self, verbose=0, dry_run=0, force=0):
CCompiler.__init__ (self, verbose, dry_run, force)
self.__version = VERSION
- self.__arch = ARCH
self.__root = r"Software\Microsoft\VisualStudio"
# self.__macros = MACROS
self.__path = []
+ # target platform (.plat_name is consistent with 'bdist')
+ self.plat_name = None
+ self.__arch = None # deprecated name
self.initialized = False
- def initialize(self):
+ def initialize(self, plat_name=None):
+ # multi-init means we would need to check platform same each time...
+ assert not self.initialized, "don't init multiple times"
+ if plat_name is None:
+ plat_name = get_platform()
+ # sanity check for platforms to prevent obscure errors later.
+ ok_plats = 'win32', 'win-amd64', 'win-ia64'
+ if plat_name not in ok_plats:
+ raise DistutilsPlatformError("--plat-name must be one of %s" %
+ (ok_plats,))
+
if "DISTUTILS_USE_SDK" in os.environ and "MSSdk" in os.environ and self.find_exe("cl.exe"):
# Assume that the SDK set up everything alright; don't try to be
# smarter
@@ -334,9 +331,24 @@ class MSVCCompiler(CCompiler) :
self.rc = "rc.exe"
self.mc = "mc.exe"
else:
- self.__paths = VC_ENV['path'].split(os.pathsep)
- os.environ['lib'] = VC_ENV['lib']
- os.environ['include'] = VC_ENV['include']
+ # On x86, 'vcvars32.bat amd64' creates an env that doesn't work;
+ # to cross compile, you use 'x86_amd64'.
+ # On AMD64, 'vcvars32.bat amd64' is a native build env; to cross
+ # compile use 'x86' (ie, it runs the x86 compiler directly)
+ # No idea how itanium handles this, if at all.
+ if plat_name == get_platform() or plat_name == 'win32':
+ # native build or cross-compile to win32
+ plat_spec = PLAT_TO_VCVARS[plat_name]
+ else:
+ # cross compile from win32 -> some 64bit
+ plat_spec = PLAT_TO_VCVARS[get_platform()] + '_' + \
+ PLAT_TO_VCVARS[plat_name]
+
+ vc_env = query_vcvarsall(VERSION, plat_spec)
+
+ self.__paths = vc_env['path'].split(os.pathsep)
+ os.environ['lib'] = vc_env['lib']
+ os.environ['include'] = vc_env['include']
if len(self.__paths) == 0:
raise DistutilsPlatformError("Python was built with %s, "
diff --git a/Lib/distutils/msvccompiler.py b/Lib/distutils/msvccompiler.py
index 3b4e9c9..71146dc 100644
--- a/Lib/distutils/msvccompiler.py
+++ b/Lib/distutils/msvccompiler.py
@@ -638,5 +638,5 @@ if get_build_version() >= 8.0:
log.debug("Importing new compiler from distutils.msvc9compiler")
OldMSVCCompiler = MSVCCompiler
from distutils.msvc9compiler import MSVCCompiler
- from distutils.msvc9compiler import get_build_architecture
+ # get_build_architecture not really relevant now we support cross-compile
from distutils.msvc9compiler import MacroExpander
diff --git a/Lib/distutils/util.py b/Lib/distutils/util.py
index 917f1d0..72039a7 100644
--- a/Lib/distutils/util.py
+++ b/Lib/distutils/util.py
@@ -30,7 +30,7 @@ def get_platform ():
irix64-6.2
Windows will return one of:
- win-x86_64 (64bit Windows on x86_64 (AMD64))
+ win-amd64 (64bit Windows on AMD64 (aka x86_64, Intel64, EM64T, etc)
win-ia64 (64bit Windows on Itanium)
win32 (all others - specifically, sys.platform is returned)
@@ -45,7 +45,7 @@ def get_platform ():
j = sys.version.find(")", i)
look = sys.version[i+len(prefix):j].lower()
if look == 'amd64':
- return 'win-x86_64'
+ return 'win-amd64'
if look == 'itanium':
return 'win-ia64'
return sys.platform
diff --git a/Lib/inspect.py b/Lib/inspect.py
index 5cb958b..6039d4e 100644
--- a/Lib/inspect.py
+++ b/Lib/inspect.py
@@ -247,7 +247,7 @@ def isgenerator(object):
def isabstract(object):
"""Return true if the object is an abstract base class (ABC)."""
- return object.__flags__ & TPFLAGS_IS_ABSTRACT
+ return isinstance(object, type) and object.__flags__ & TPFLAGS_IS_ABSTRACT
def getmembers(object, predicate=None):
"""Return all members of an object as (name, value) pairs sorted by name.
diff --git a/Lib/sre_compile.py b/Lib/sre_compile.py
index 4f62416..3d74c3a 100644
--- a/Lib/sre_compile.py
+++ b/Lib/sre_compile.py
@@ -11,7 +11,7 @@
"""Internal support module for sre"""
import _sre, sys
-
+import sre_parse
from sre_constants import *
assert _sre.MAGIC == MAGIC, "SRE module mismatch"
@@ -493,7 +493,6 @@ def compile(p, flags=0):
# internal: convert pattern list to internal format
if isstring(p):
- import sre_parse
pattern = p
p = sre_parse.parse(p, flags)
else:
diff --git a/Lib/test/test_asynchat.py b/Lib/test/test_asynchat.py
index 2912057..d24e8cc 100644
--- a/Lib/test/test_asynchat.py
+++ b/Lib/test/test_asynchat.py
@@ -6,8 +6,7 @@ import unittest
import sys
from test import test_support
-HOST = "127.0.0.1"
-PORT = 54322
+HOST = test_support.HOST
SERVER_QUIT = b'QUIT\n'
class echo_server(threading.Thread):
@@ -18,15 +17,13 @@ class echo_server(threading.Thread):
def __init__(self, event):
threading.Thread.__init__(self)
self.event = event
+ self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ self.port = test_support.bind_port(self.sock)
def run(self):
- sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- global PORT
- PORT = test_support.bind_port(sock, HOST, PORT)
- sock.listen(1)
+ self.sock.listen(1)
self.event.set()
- conn, client = sock.accept()
+ conn, client = self.sock.accept()
self.buffer = b""
# collect data until quit message is seen
while SERVER_QUIT not in self.buffer:
@@ -50,15 +47,15 @@ class echo_server(threading.Thread):
pass
conn.close()
- sock.close()
+ self.sock.close()
class echo_client(asynchat.async_chat):
- def __init__(self, terminator):
+ def __init__(self, terminator, server_port):
asynchat.async_chat.__init__(self)
self.contents = []
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
- self.connect((HOST, PORT))
+ self.connect((HOST, server_port))
self.set_terminator(terminator)
self.buffer = b""
@@ -106,7 +103,7 @@ class TestAsynchat(unittest.TestCase):
event.wait()
event.clear()
time.sleep(0.01) # Give server time to start accepting.
- c = echo_client(term)
+ c = echo_client(term, s.port)
c.push(b"hello ")
c.push(bytes("world%s" % term, "ascii"))
c.push(bytes("I'm not dead yet!%s" % term, "ascii"))
@@ -138,7 +135,7 @@ class TestAsynchat(unittest.TestCase):
def numeric_terminator_check(self, termlen):
# Try reading a fixed number of bytes
s, event = start_echo_server()
- c = echo_client(termlen)
+ c = echo_client(termlen, s.port)
data = b"hello world, I'm not dead yet!\n"
c.push(data)
c.push(SERVER_QUIT)
@@ -158,7 +155,7 @@ class TestAsynchat(unittest.TestCase):
def test_none_terminator(self):
# Try reading a fixed number of bytes
s, event = start_echo_server()
- c = echo_client(None)
+ c = echo_client(None, s.port)
data = b"hello world, I'm not dead yet!\n"
c.push(data)
c.push(SERVER_QUIT)
@@ -170,7 +167,7 @@ class TestAsynchat(unittest.TestCase):
def test_simple_producer(self):
s, event = start_echo_server()
- c = echo_client(b'\n')
+ c = echo_client(b'\n', s.port)
data = b"hello world\nI'm not dead yet!\n"
p = asynchat.simple_producer(data+SERVER_QUIT, buffer_size=8)
c.push_with_producer(p)
@@ -181,7 +178,7 @@ class TestAsynchat(unittest.TestCase):
def test_string_producer(self):
s, event = start_echo_server()
- c = echo_client(b'\n')
+ c = echo_client(b'\n', s.port)
data = b"hello world\nI'm not dead yet!\n"
c.push_with_producer(data+SERVER_QUIT)
asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
@@ -192,8 +189,8 @@ class TestAsynchat(unittest.TestCase):
def test_empty_line(self):
# checks that empty lines are handled correctly
s, event = start_echo_server()
- c = echo_client(b'\n')
- c.push(b"hello world\n\nI'm not dead yet!\n")
+ c = echo_client(b'\n', s.port)
+ c.push("hello world\n\nI'm not dead yet!\n")
c.push(SERVER_QUIT)
asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
s.join()
@@ -203,8 +200,8 @@ class TestAsynchat(unittest.TestCase):
def test_close_when_done(self):
s, event = start_echo_server()
- c = echo_client(b'\n')
- c.push(b"hello world\nI'm not dead yet!\n")
+ c = echo_client(b'\n', s.port)
+ c.push("hello world\nI'm not dead yet!\n")
c.push(SERVER_QUIT)
c.close_when_done()
asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
diff --git a/Lib/test/test_asyncore.py b/Lib/test/test_asyncore.py
index 6dc73ad..09401bd 100644
--- a/Lib/test/test_asyncore.py
+++ b/Lib/test/test_asyncore.py
@@ -9,10 +9,10 @@ import time
from test import test_support
from test.test_support import TESTFN, run_unittest, unlink
-from io import StringIO, BytesIO
+from io import BytesIO
+from io import StringIO
-HOST = "127.0.0.1"
-PORT = None
+HOST = test_support.HOST
class dummysocket:
def __init__(self):
@@ -52,14 +52,8 @@ class crashingdummy:
self.error_handled = True
# used when testing senders; just collects what it gets until newline is sent
-def capture_server(evt, buf):
+def capture_server(evt, buf, serv):
try:
- serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- serv.settimeout(3)
- serv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- serv.bind(("", 0))
- global PORT
- PORT = serv.getsockname()[1]
serv.listen(5)
conn, addr = serv.accept()
except socket.timeout:
@@ -80,7 +74,6 @@ def capture_server(evt, buf):
conn.close()
finally:
serv.close()
- PORT = None
evt.set()
@@ -339,14 +332,13 @@ class DispatcherWithSendTests(unittest.TestCase):
def test_send(self):
self.evt = threading.Event()
- cap = BytesIO()
- threading.Thread(target=capture_server, args=(self.evt, cap)).start()
+ self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ self.sock.settimeout(3)
+ self.port = test_support.bind_port(self.sock)
- # wait until server thread has assigned a port number
- n = 1000
- while PORT is None and n > 0:
- time.sleep(0.01)
- n -= 1
+ cap = BytesIO()
+ args = (self.evt, cap, self.sock)
+ threading.Thread(target=capture_server, args=args).start()
# wait a little longer for the server to initialize (it sometimes
# refuses connections on slow machines without this wait)
@@ -355,7 +347,7 @@ class DispatcherWithSendTests(unittest.TestCase):
data = b"Suppose there isn't a 16-ton weight?"
d = dispatcherwithsend_noread()
d.create_socket(socket.AF_INET, socket.SOCK_STREAM)
- d.connect((HOST, PORT))
+ d.connect((HOST, self.port))
# give time for socket to connect
time.sleep(0.1)
diff --git a/Lib/test/test_capi.py b/Lib/test/test_capi.py
index 107d4b4..87be9ee 100644
--- a/Lib/test/test_capi.py
+++ b/Lib/test/test_capi.py
@@ -16,9 +16,6 @@ def test_main():
# some extra thread-state tests driven via _testcapi
def TestThreadState():
- import thread
- import time
-
if test_support.verbose:
print("auto-thread-state")
@@ -42,6 +39,8 @@ def test_main():
have_thread_state = False
if have_thread_state:
+ import thread
+ import time
TestThreadState()
import threading
t=threading.Thread(target=TestThreadState)
diff --git a/Lib/test/test_ftplib.py b/Lib/test/test_ftplib.py
index 1ff8d08..0204c6f 100644
--- a/Lib/test/test_ftplib.py
+++ b/Lib/test/test_ftplib.py
@@ -6,18 +6,13 @@ import time
from unittest import TestCase
from test import test_support
-server_port = None
+HOST = test_support.HOST
# This function sets the evt 3 times:
# 1) when the connection is ready to be accepted.
# 2) when it is safe for the caller to close the connection
# 3) when we have closed the socket
-def server(evt):
- global server_port
- serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- serv.settimeout(3)
- serv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- server_port = test_support.bind_port(serv, "", 9091)
+def server(evt, serv):
serv.listen(5)
# (1) Signal the caller that we are ready to accept the connection.
@@ -40,14 +35,16 @@ class GeneralTests(TestCase):
def setUp(self):
self.evt = threading.Event()
- threading.Thread(target=server, args=(self.evt,)).start()
+ self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ self.sock.settimeout(3)
+ self.port = test_support.bind_port(self.sock)
+ threading.Thread(target=server, args=(self.evt,self.sock)).start()
# Wait for the server to be ready.
self.evt.wait()
self.evt.clear()
- ftplib.FTP.port = server_port
+ ftplib.FTP.port = self.port
def tearDown(self):
- # Wait on the closing of the socket (this shouldn't be necessary).
self.evt.wait()
def testBasic(self):
@@ -55,34 +52,34 @@ class GeneralTests(TestCase):
ftplib.FTP()
# connects
- ftp = ftplib.FTP("localhost")
+ ftp = ftplib.FTP(HOST)
self.evt.wait()
ftp.sock.close()
def testTimeoutDefault(self):
# default
- ftp = ftplib.FTP("localhost")
+ ftp = ftplib.FTP(HOST)
self.assertTrue(ftp.sock.gettimeout() is None)
self.evt.wait()
ftp.sock.close()
def testTimeoutValue(self):
# a value
- ftp = ftplib.FTP("localhost", timeout=30)
+ ftp = ftplib.FTP(HOST, timeout=30)
self.assertEqual(ftp.sock.gettimeout(), 30)
self.evt.wait()
ftp.sock.close()
def testTimeoutConnect(self):
ftp = ftplib.FTP()
- ftp.connect("localhost", timeout=30)
+ ftp.connect(HOST, timeout=30)
self.assertEqual(ftp.sock.gettimeout(), 30)
self.evt.wait()
ftp.sock.close()
def testTimeoutDifferentOrder(self):
ftp = ftplib.FTP(timeout=30)
- ftp.connect("localhost")
+ ftp.connect(HOST)
self.assertEqual(ftp.sock.gettimeout(), 30)
self.evt.wait()
ftp.sock.close()
@@ -90,7 +87,7 @@ class GeneralTests(TestCase):
def testTimeoutDirectAccess(self):
ftp = ftplib.FTP()
ftp.timeout = 30
- ftp.connect("localhost")
+ ftp.connect(HOST)
self.assertEqual(ftp.sock.gettimeout(), 30)
self.evt.wait()
ftp.sock.close()
@@ -100,7 +97,7 @@ class GeneralTests(TestCase):
previous = socket.getdefaulttimeout()
socket.setdefaulttimeout(30)
try:
- ftp = ftplib.FTP("localhost", timeout=None)
+ ftp = ftplib.FTP(HOST, timeout=None)
finally:
socket.setdefaulttimeout(previous)
self.assertEqual(ftp.sock.gettimeout(), 30)
diff --git a/Lib/test/test_httplib.py b/Lib/test/test_httplib.py
index ca801da..f0e551f 100644
--- a/Lib/test/test_httplib.py
+++ b/Lib/test/test_httplib.py
@@ -6,6 +6,8 @@ from unittest import TestCase
from test import test_support
+HOST = test_support.HOST
+
class FakeSocket:
def __init__(self, text, fileclass=io.BytesIO):
if isinstance(text, str):
@@ -199,16 +201,12 @@ class OfflineTest(TestCase):
def test_responses(self):
self.assertEquals(httplib.responses[httplib.NOT_FOUND], "Not Found")
-PORT = 50003
-HOST = "localhost"
-
class TimeoutTest(TestCase):
+ PORT = None
def setUp(self):
self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- self.serv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- global PORT
- PORT = test_support.bind_port(self.serv, HOST, PORT)
+ TimeoutTest.PORT = test_support.bind_port(self.serv)
self.serv.listen(5)
def tearDown(self):
@@ -220,13 +218,13 @@ class TimeoutTest(TestCase):
# and into the socket.
# default
- httpConn = httplib.HTTPConnection(HOST, PORT)
+ httpConn = httplib.HTTPConnection(HOST, TimeoutTest.PORT)
httpConn.connect()
self.assertTrue(httpConn.sock.gettimeout() is None)
httpConn.close()
# a value
- httpConn = httplib.HTTPConnection(HOST, PORT, timeout=30)
+ httpConn = httplib.HTTPConnection(HOST, TimeoutTest.PORT, timeout=30)
httpConn.connect()
self.assertEqual(httpConn.sock.gettimeout(), 30)
httpConn.close()
@@ -235,7 +233,8 @@ class TimeoutTest(TestCase):
previous = socket.getdefaulttimeout()
socket.setdefaulttimeout(30)
try:
- httpConn = httplib.HTTPConnection(HOST, PORT, timeout=None)
+ httpConn = httplib.HTTPConnection(HOST, TimeoutTest.PORT,
+ timeout=None)
httpConn.connect()
finally:
socket.setdefaulttimeout(previous)
@@ -249,11 +248,12 @@ class HTTPSTimeoutTest(TestCase):
def test_attributes(self):
# simple test to check it's storing it
if hasattr(httplib, 'HTTPSConnection'):
- h = httplib.HTTPSConnection(HOST, PORT, timeout=30)
+ h = httplib.HTTPSConnection(HOST, TimeoutTest.PORT, timeout=30)
self.assertEqual(h.timeout, 30)
def test_main(verbose=None):
- test_support.run_unittest(HeaderTests, OfflineTest, BasicTest, TimeoutTest, HTTPSTimeoutTest)
+ test_support.run_unittest(HeaderTests, OfflineTest, BasicTest, TimeoutTest,
+ HTTPSTimeoutTest)
if __name__ == '__main__':
test_main()
diff --git a/Lib/test/test_poplib.py b/Lib/test/test_poplib.py
index 983cf21..cd550df 100644
--- a/Lib/test/test_poplib.py
+++ b/Lib/test/test_poplib.py
@@ -6,14 +6,10 @@ import time
from unittest import TestCase
from test import test_support
+HOST = test_support.HOST
-def server(ready, evt):
- serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- serv.settimeout(3)
- serv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- serv.bind(("", 9091))
+def server(evt, serv):
serv.listen(5)
- ready.set()
try:
conn, addr = serv.accept()
except socket.timeout:
@@ -29,27 +25,29 @@ class GeneralTests(TestCase):
def setUp(self):
self.evt = threading.Event()
- self.ready = threading.Event()
- threading.Thread(target=server, args=(self.ready, self.evt,)).start()
- self.ready.wait()
+ self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ self.sock.settimeout(3)
+ self.port = test_support.bind_port(self.sock)
+ threading.Thread(target=server, args=(self.evt,self.sock)).start()
+ time.sleep(.1)
def tearDown(self):
self.evt.wait()
def testBasic(self):
# connects
- pop = poplib.POP3("localhost", 9091)
+ pop = poplib.POP3(HOST, self.port)
pop.sock.close()
def testTimeoutDefault(self):
# default
- pop = poplib.POP3("localhost", 9091)
+ pop = poplib.POP3(HOST, self.port)
self.assertTrue(pop.sock.gettimeout() is None)
pop.sock.close()
def testTimeoutValue(self):
# a value
- pop = poplib.POP3("localhost", 9091, timeout=30)
+ pop = poplib.POP3(HOST, self.port, timeout=30)
self.assertEqual(pop.sock.gettimeout(), 30)
pop.sock.close()
@@ -58,7 +56,7 @@ class GeneralTests(TestCase):
previous = socket.getdefaulttimeout()
socket.setdefaulttimeout(30)
try:
- pop = poplib.POP3("localhost", 9091, timeout=None)
+ pop = poplib.POP3(HOST, self.port, timeout=None)
finally:
socket.setdefaulttimeout(previous)
self.assertEqual(pop.sock.gettimeout(), 30)
diff --git a/Lib/test/test_signal.py b/Lib/test/test_signal.py
index a410710..339cec2 100644
--- a/Lib/test/test_signal.py
+++ b/Lib/test/test_signal.py
@@ -48,16 +48,21 @@ class InterProcessSignalTests(unittest.TestCase):
if self.using_gc:
gc.enable()
- def handlerA(self, *args):
+ def format_frame(self, frame, limit=None):
+ return ''.join(traceback.format_stack(frame, limit=limit))
+
+ def handlerA(self, signum, frame):
self.a_called = True
if test_support.verbose:
- print("handlerA invoked", args)
+ print("handlerA invoked from signal %s at:\n%s" % (
+ signum, self.format_frame(frame, limit=1)))
- def handlerB(self, *args):
+ def handlerB(self, signum, frame):
self.b_called = True
if test_support.verbose:
- print("handlerB invoked", args)
- raise HandlerBCalled(*args)
+ print ("handlerB invoked from signal %s at:\n%s" % (
+ signum, self.format_frame(frame, limit=1)))
+ raise HandlerBCalled(signum, self.format_frame(frame))
def wait(self, child):
"""Wait for child to finish, ignoring EINTR."""
@@ -95,6 +100,10 @@ class InterProcessSignalTests(unittest.TestCase):
self.assertFalse(self.b_called)
self.a_called = False
+ # Make sure the signal isn't delivered while the previous
+ # Popen object is being destroyed, because __del__ swallows
+ # exceptions.
+ del child
try:
child = subprocess.Popen(['kill', '-USR1', str(pid)])
# This wait should be interrupted by the signal's exception.
diff --git a/Lib/test/test_smtplib.py b/Lib/test/test_smtplib.py
index e1c198f5..9dc672d 100644
--- a/Lib/test/test_smtplib.py
+++ b/Lib/test/test_smtplib.py
@@ -12,18 +12,9 @@ import select
from unittest import TestCase
from test import test_support
-# PORT is used to communicate the port number assigned to the server
-# to the test client
-HOST = "localhost"
-PORT = None
-
-def server(evt, buf):
- serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- serv.settimeout(15)
- serv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- serv.bind(("", 0))
- global PORT
- PORT = serv.getsockname()[1]
+HOST = test_support.HOST
+
+def server(evt, buf, serv):
serv.listen(5)
evt.set()
try:
@@ -43,14 +34,16 @@ def server(evt, buf):
conn.close()
finally:
serv.close()
- PORT = None
evt.set()
class GeneralTests(TestCase):
def setUp(self):
self.evt = threading.Event()
- servargs = (self.evt, b"220 Hola mundo\n")
+ self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ self.sock.settimeout(15)
+ self.port = test_support.bind_port(self.sock)
+ servargs = (self.evt, b"220 Hola mundo\n", self.sock)
threading.Thread(target=server, args=servargs).start()
self.evt.wait()
self.evt.clear()
@@ -60,29 +53,29 @@ class GeneralTests(TestCase):
def testBasic1(self):
# connects
- smtp = smtplib.SMTP(HOST, PORT)
+ smtp = smtplib.SMTP(HOST, self.port)
smtp.sock.close()
def testBasic2(self):
# connects, include port in host name
- smtp = smtplib.SMTP("%s:%s" % (HOST, PORT))
+ smtp = smtplib.SMTP("%s:%s" % (HOST, self.port))
smtp.sock.close()
def testLocalHostName(self):
# check that supplied local_hostname is used
- smtp = smtplib.SMTP(HOST, PORT, local_hostname="testhost")
+ smtp = smtplib.SMTP(HOST, self.port, local_hostname="testhost")
self.assertEqual(smtp.local_hostname, "testhost")
smtp.sock.close()
def testTimeoutDefault(self):
# default
- smtp = smtplib.SMTP(HOST, PORT)
+ smtp = smtplib.SMTP(HOST, self.port)
self.assertTrue(smtp.sock.gettimeout() is None)
smtp.sock.close()
def testTimeoutValue(self):
# a value
- smtp = smtplib.SMTP(HOST, PORT, timeout=30)
+ smtp = smtplib.SMTP(HOST, self.port, timeout=30)
self.assertEqual(smtp.sock.gettimeout(), 30)
smtp.sock.close()
@@ -91,7 +84,7 @@ class GeneralTests(TestCase):
previous = socket.getdefaulttimeout()
socket.setdefaulttimeout(30)
try:
- smtp = smtplib.SMTP(HOST, PORT, timeout=None)
+ smtp = smtplib.SMTP(HOST, self.port, timeout=None)
finally:
socket.setdefaulttimeout(previous)
self.assertEqual(smtp.sock.gettimeout(), 30)
@@ -99,10 +92,7 @@ class GeneralTests(TestCase):
# Test server thread using the specified SMTP server class
-def debugging_server(server_class, serv_evt, client_evt):
- serv = server_class(("", 0), ('nowhere', -1))
- global PORT
- PORT = serv.getsockname()[1]
+def debugging_server(serv, serv_evt, client_evt):
serv_evt.set()
try:
@@ -131,7 +121,6 @@ def debugging_server(server_class, serv_evt, client_evt):
time.sleep(0.5)
serv.close()
asyncore.close_all()
- PORT = None
serv_evt.set()
MSG_BEGIN = '---------- MESSAGE FOLLOWS ----------\n'
@@ -153,7 +142,9 @@ class DebuggingServerTests(TestCase):
self.serv_evt = threading.Event()
self.client_evt = threading.Event()
- serv_args = (smtpd.DebuggingServer, self.serv_evt, self.client_evt)
+ self.port = test_support.find_unused_port()
+ self.serv = smtpd.DebuggingServer((HOST, self.port), ('nowhere', -1))
+ serv_args = (self.serv, self.serv_evt, self.client_evt)
threading.Thread(target=debugging_server, args=serv_args).start()
# wait until server thread has assigned a port number
@@ -170,31 +161,31 @@ class DebuggingServerTests(TestCase):
def testBasic(self):
# connect
- smtp = smtplib.SMTP(HOST, PORT, local_hostname='localhost', timeout=3)
+ smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
smtp.quit()
def testNOOP(self):
- smtp = smtplib.SMTP(HOST, PORT, local_hostname='localhost', timeout=3)
+ smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
expected = (250, b'Ok')
self.assertEqual(smtp.noop(), expected)
smtp.quit()
def testRSET(self):
- smtp = smtplib.SMTP(HOST, PORT, local_hostname='localhost', timeout=3)
+ smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
expected = (250, b'Ok')
self.assertEqual(smtp.rset(), expected)
smtp.quit()
def testNotImplemented(self):
# EHLO isn't implemented in DebuggingServer
- smtp = smtplib.SMTP(HOST, PORT, local_hostname='localhost', timeout=3)
+ smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
expected = (502, b'Error: command "EHLO" not implemented')
self.assertEqual(smtp.ehlo(), expected)
smtp.quit()
def testVRFY(self):
# VRFY isn't implemented in DebuggingServer
- smtp = smtplib.SMTP(HOST, PORT, local_hostname='localhost', timeout=3)
+ smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
expected = (502, b'Error: command "VRFY" not implemented')
self.assertEqual(smtp.vrfy('nobody@nowhere.com'), expected)
self.assertEqual(smtp.verify('nobody@nowhere.com'), expected)
@@ -203,21 +194,21 @@ class DebuggingServerTests(TestCase):
def testSecondHELO(self):
# check that a second HELO returns a message that it's a duplicate
# (this behavior is specific to smtpd.SMTPChannel)
- smtp = smtplib.SMTP(HOST, PORT, local_hostname='localhost', timeout=3)
+ smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
smtp.helo()
expected = (503, b'Duplicate HELO/EHLO')
self.assertEqual(smtp.helo(), expected)
smtp.quit()
def testHELP(self):
- smtp = smtplib.SMTP(HOST, PORT, local_hostname='localhost', timeout=3)
+ smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
self.assertEqual(smtp.help(), b'Error: command "HELP" not implemented')
smtp.quit()
def testSend(self):
# connect and send mail
m = 'A test message'
- smtp = smtplib.SMTP(HOST, PORT, local_hostname='localhost', timeout=3)
+ smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
smtp.sendmail('John', 'Sally', m)
smtp.quit()
@@ -257,7 +248,10 @@ class BadHELOServerTests(TestCase):
sys.stdout = self.output
self.evt = threading.Event()
- servargs = (self.evt, b"199 no hello for you!\n")
+ self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ self.sock.settimeout(15)
+ self.port = test_support.bind_port(self.sock)
+ servargs = (self.evt, b"199 no hello for you!\n", self.sock)
threading.Thread(target=server, args=servargs).start()
self.evt.wait()
self.evt.clear()
@@ -268,7 +262,7 @@ class BadHELOServerTests(TestCase):
def testFailingHELO(self):
self.assertRaises(smtplib.SMTPConnectError, smtplib.SMTP,
- HOST, PORT, 'localhost', 3)
+ HOST, self.port, 'localhost', 3)
sim_users = {'Mr.A@somewhere.com':'John A',
@@ -333,7 +327,9 @@ class SMTPSimTests(TestCase):
def setUp(self):
self.serv_evt = threading.Event()
self.client_evt = threading.Event()
- serv_args = (SimSMTPServer, self.serv_evt, self.client_evt)
+ self.port = test_support.find_unused_port()
+ self.serv = SimSMTPServer((HOST, self.port), ('nowhere', -1))
+ serv_args = (self.serv, self.serv_evt, self.client_evt)
threading.Thread(target=debugging_server, args=serv_args).start()
# wait until server thread has assigned a port number
@@ -348,11 +344,11 @@ class SMTPSimTests(TestCase):
def testBasic(self):
# smoke test
- smtp = smtplib.SMTP(HOST, PORT, local_hostname='localhost', timeout=15)
+ smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
smtp.quit()
def testEHLO(self):
- smtp = smtplib.SMTP(HOST, PORT, local_hostname='localhost', timeout=15)
+ smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
# no features should be present before the EHLO
self.assertEqual(smtp.esmtp_features, {})
@@ -373,7 +369,7 @@ class SMTPSimTests(TestCase):
smtp.quit()
def testVRFY(self):
- smtp = smtplib.SMTP(HOST, PORT, local_hostname='localhost', timeout=15)
+ smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
for email, name in sim_users.items():
expected_known = (250, bytes('%s %s' %
@@ -388,7 +384,7 @@ class SMTPSimTests(TestCase):
smtp.quit()
def testEXPN(self):
- smtp = smtplib.SMTP(HOST, PORT, local_hostname='localhost', timeout=15)
+ smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
for listname, members in sim_lists.items():
users = []
diff --git a/Lib/test/test_socket.py b/Lib/test/test_socket.py
index 2bec373..8c5cf93 100644
--- a/Lib/test/test_socket.py
+++ b/Lib/test/test_socket.py
@@ -3,6 +3,7 @@
import unittest
from test import test_support
+import errno
import socket
import select
import thread, threading
@@ -15,17 +16,14 @@ import array
from weakref import proxy
import signal
-PORT = 50007
-HOST = 'localhost'
+HOST = test_support.HOST
MSG = b'Michael Gilfix was here\n'
class SocketTCPTest(unittest.TestCase):
def setUp(self):
self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- self.serv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- global PORT
- PORT = test_support.bind_port(self.serv, HOST, PORT)
+ self.port = test_support.bind_port(self.serv)
self.serv.listen(1)
def tearDown(self):
@@ -36,9 +34,7 @@ class SocketUDPTest(unittest.TestCase):
def setUp(self):
self.serv = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
- self.serv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- global PORT
- PORT = test_support.bind_port(self.serv, HOST, PORT)
+ self.port = test_support.bind_port(self.serv)
def tearDown(self):
self.serv.close()
@@ -190,7 +186,7 @@ class SocketConnectedTest(ThreadedTCPSocketTest):
def clientSetUp(self):
ThreadedTCPSocketTest.clientSetUp(self)
- self.cli.connect((HOST, PORT))
+ self.cli.connect((HOST, self.port))
self.serv_conn = self.cli
def clientTearDown(self):
@@ -470,16 +466,23 @@ class GeneralModuleTests(unittest.TestCase):
# XXX The following don't test module-level functionality...
def testSockName(self):
- # Testing getsockname()
+ # Testing getsockname(). Use a temporary socket to elicit an unused
+ # ephemeral port that we can use later in the test.
+ tempsock = socket.socket()
+ tempsock.bind(("0.0.0.0", 0))
+ (host, port) = tempsock.getsockname()
+ tempsock.close()
+ del tempsock
+
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- sock.bind(("0.0.0.0", PORT+1))
+ sock.bind(("0.0.0.0", port))
name = sock.getsockname()
# XXX(nnorwitz): http://tinyurl.com/os5jz seems to indicate
# it reasonable to get the host's addr in addition to 0.0.0.0.
# At least for eCos. This is required for the S/390 to pass.
my_ip_addr = socket.gethostbyname(socket.gethostname())
self.assert_(name[0] in ("0.0.0.0", my_ip_addr), '%s invalid' % name[0])
- self.assertEqual(name[1], PORT+1)
+ self.assertEqual(name[1], port)
def testGetSockOpt(self):
# Testing getsockopt()
@@ -615,7 +618,7 @@ class BasicUDPTest(ThreadedUDPSocketTest):
self.assertEqual(msg, MSG)
def _testSendtoAndRecv(self):
- self.cli.sendto(MSG, 0, (HOST, PORT))
+ self.cli.sendto(MSG, 0, (HOST, self.port))
def testRecvFrom(self):
# Testing recvfrom() over UDP
@@ -623,14 +626,14 @@ class BasicUDPTest(ThreadedUDPSocketTest):
self.assertEqual(msg, MSG)
def _testRecvFrom(self):
- self.cli.sendto(MSG, 0, (HOST, PORT))
+ self.cli.sendto(MSG, 0, (HOST, self.port))
def testRecvFromNegative(self):
# Negative lengths passed to recvfrom should give ValueError.
self.assertRaises(ValueError, self.serv.recvfrom, -1)
def _testRecvFromNegative(self):
- self.cli.sendto(MSG, 0, (HOST, PORT))
+ self.cli.sendto(MSG, 0, (HOST, self.port))
class TCPCloserTest(ThreadedTCPSocketTest):
@@ -648,7 +651,7 @@ class TCPCloserTest(ThreadedTCPSocketTest):
conn.close()
def _testClose(self):
- self.cli.connect((HOST, PORT))
+ self.cli.connect((HOST, self.port))
time.sleep(1.0)
class BasicSocketPairTest(SocketPairTest):
@@ -706,7 +709,7 @@ class NonBlockingTCPTests(ThreadedTCPSocketTest):
def _testAccept(self):
time.sleep(0.1)
- self.cli.connect((HOST, PORT))
+ self.cli.connect((HOST, self.port))
def testConnect(self):
# Testing non-blocking connect
@@ -714,7 +717,7 @@ class NonBlockingTCPTests(ThreadedTCPSocketTest):
def _testConnect(self):
self.cli.settimeout(10)
- self.cli.connect((HOST, PORT))
+ self.cli.connect((HOST, self.port))
def testRecv(self):
# Testing non-blocking recv
@@ -734,7 +737,7 @@ class NonBlockingTCPTests(ThreadedTCPSocketTest):
self.fail("Error during select call to non-blocking socket.")
def _testRecv(self):
- self.cli.connect((HOST, PORT))
+ self.cli.connect((HOST, self.port))
time.sleep(0.1)
self.cli.send(MSG)
@@ -883,7 +886,9 @@ class NetworkConnectionTest(object):
"""Prove network connection."""
def clientSetUp(self):
- self.cli = socket.create_connection((HOST, PORT))
+ # We're inherited below by BasicTCPTest2, which also inherits
+ # BasicTCPTest, which defines self.port referenced below.
+ self.cli = socket.create_connection((HOST, self.port))
self.serv_conn = self.cli
class BasicTCPTest2(NetworkConnectionTest, BasicTCPTest):
@@ -893,7 +898,11 @@ class BasicTCPTest2(NetworkConnectionTest, BasicTCPTest):
class NetworkConnectionNoServer(unittest.TestCase):
def testWithoutServer(self):
- self.failUnlessRaises(socket.error, lambda: socket.create_connection((HOST, PORT)))
+ port = test_support.find_unused_port()
+ self.failUnlessRaises(
+ socket.error,
+ lambda: socket.create_connection((HOST, port))
+ )
class NetworkConnectionAttributesTest(SocketTCPTest, ThreadableTest):
@@ -914,22 +923,22 @@ class NetworkConnectionAttributesTest(SocketTCPTest, ThreadableTest):
testFamily = _justAccept
def _testFamily(self):
- self.cli = socket.create_connection((HOST, PORT), timeout=30)
+ self.cli = socket.create_connection((HOST, self.port), timeout=30)
self.assertEqual(self.cli.family, 2)
testTimeoutDefault = _justAccept
def _testTimeoutDefault(self):
- self.cli = socket.create_connection((HOST, PORT))
+ self.cli = socket.create_connection((HOST, self.port))
self.assertTrue(self.cli.gettimeout() is None)
testTimeoutValueNamed = _justAccept
def _testTimeoutValueNamed(self):
- self.cli = socket.create_connection((HOST, PORT), timeout=30)
+ self.cli = socket.create_connection((HOST, self.port), timeout=30)
self.assertEqual(self.cli.gettimeout(), 30)
testTimeoutValueNonamed = _justAccept
def _testTimeoutValueNonamed(self):
- self.cli = socket.create_connection((HOST, PORT), 30)
+ self.cli = socket.create_connection((HOST, self.port), 30)
self.assertEqual(self.cli.gettimeout(), 30)
testTimeoutNone = _justAccept
@@ -937,7 +946,7 @@ class NetworkConnectionAttributesTest(SocketTCPTest, ThreadableTest):
previous = socket.getdefaulttimeout()
socket.setdefaulttimeout(30)
try:
- self.cli = socket.create_connection((HOST, PORT), timeout=None)
+ self.cli = socket.create_connection((HOST, self.port), timeout=None)
finally:
socket.setdefaulttimeout(previous)
self.assertEqual(self.cli.gettimeout(), 30)
@@ -964,12 +973,12 @@ class NetworkConnectionBehaviourTest(SocketTCPTest, ThreadableTest):
testOutsideTimeout = testInsideTimeout
def _testInsideTimeout(self):
- self.cli = sock = socket.create_connection((HOST, PORT))
+ self.cli = sock = socket.create_connection((HOST, self.port))
data = sock.recv(5)
self.assertEqual(data, b"done!")
def _testOutsideTimeout(self):
- self.cli = sock = socket.create_connection((HOST, PORT), timeout=1)
+ self.cli = sock = socket.create_connection((HOST, self.port), timeout=1)
self.failUnlessRaises(socket.timeout, lambda: sock.recv(5))
diff --git a/Lib/test/test_socketserver.py b/Lib/test/test_socketserver.py
index 0656176..3946078 100644
--- a/Lib/test/test_socketserver.py
+++ b/Lib/test/test_socketserver.py
@@ -22,7 +22,7 @@ from test.test_support import TESTFN as TEST_FILE
test.test_support.requires("network")
TEST_STR = b"hello world\n"
-HOST = "localhost"
+HOST = test.test_support.HOST
HAVE_UNIX_SOCKETS = hasattr(socket, "AF_UNIX")
HAVE_FORKING = hasattr(os, "fork") and os.name != "os2"
diff --git a/Lib/test/test_ssl.py b/Lib/test/test_ssl.py
index 520f440..99ed00f 100644
--- a/Lib/test/test_ssl.py
+++ b/Lib/test/test_ssl.py
@@ -25,11 +25,10 @@ try:
except ImportError:
skip_expected = True
+HOST = test_support.HOST
CERTFILE = None
SVN_PYTHON_ORG_ROOT_CERT = None
-TESTPORT = 10025
-
def handle_error(prefix):
exc_format = ' '.join(traceback.format_exception(*sys.exc_info()))
if test_support.verbose:
@@ -299,7 +298,7 @@ else:
except:
handle_error('')
- def __init__(self, port, certificate, ssl_version=None,
+ def __init__(self, certificate, ssl_version=None,
certreqs=None, cacerts=None, expect_bad_connects=False,
chatty=True, connectionchatty=False, starttls_server=False):
if ssl_version is None:
@@ -315,12 +314,8 @@ else:
self.connectionchatty = connectionchatty
self.starttls_server = starttls_server
self.sock = socket.socket()
+ self.port = test_support.bind_port(self.sock)
self.flag = None
- if hasattr(socket, 'SO_REUSEADDR'):
- self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- if hasattr(socket, 'SO_REUSEPORT'):
- self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
- self.sock.bind(('127.0.0.1', port))
self.active = False
threading.Thread.__init__(self)
self.setDaemon(False)
@@ -471,12 +466,13 @@ else:
format%args))
- def __init__(self, port, certfile):
+ def __init__(self, certfile):
self.flag = None
self.active = False
self.RootedHTTPRequestHandler.root = os.path.split(CERTFILE)[0]
+ self.port = test_support.find_unused_port()
self.server = self.HTTPSServer(
- ('', port), self.RootedHTTPRequestHandler, certfile)
+ (HOST, self.port), self.RootedHTTPRequestHandler, certfile)
threading.Thread.__init__(self)
self.setDaemon(True)
@@ -586,7 +582,7 @@ else:
self.server.close()
def badCertTest (certfile):
- server = ThreadedEchoServer(TESTPORT, CERTFILE,
+ server = ThreadedEchoServer(CERTFILE,
certreqs=ssl.CERT_REQUIRED,
cacerts=CERTFILE, chatty=False,
connectionchatty=False)
@@ -600,7 +596,7 @@ else:
s = ssl.wrap_socket(socket.socket(),
certfile=certfile,
ssl_version=ssl.PROTOCOL_TLSv1)
- s.connect(('127.0.0.1', TESTPORT))
+ s.connect((HOST, server.port))
except ssl.SSLError as x:
if test_support.verbose:
sys.stdout.write("\nSSLError is %s\n" % x)
@@ -616,7 +612,7 @@ else:
indata="FOO\n",
chatty=False, connectionchatty=False):
- server = ThreadedEchoServer(TESTPORT, certfile,
+ server = ThreadedEchoServer(certfile,
certreqs=certreqs,
ssl_version=protocol,
cacerts=cacertsfile,
@@ -631,12 +627,11 @@ else:
client_protocol = protocol
try:
s = ssl.wrap_socket(socket.socket(),
- server_side=False,
certfile=client_certfile,
ca_certs=cacertsfile,
cert_reqs=certreqs,
ssl_version=client_protocol)
- s.connect(('127.0.0.1', TESTPORT))
+ s.connect((HOST, server.port))
except ssl.SSLError as x:
raise test_support.TestFailed("Unexpected SSL error: " + str(x))
except Exception as x:
@@ -646,18 +641,17 @@ else:
if test_support.verbose:
sys.stdout.write(
" client: sending %s...\n" % (repr(indata)))
- s.write(indata.encode('ASCII', 'strict'))
+ s.write(indata)
outdata = s.read()
if connectionchatty:
if test_support.verbose:
sys.stdout.write(" client: read %s\n" % repr(outdata))
- outdata = str(outdata, 'ASCII', 'strict')
if outdata != indata.lower():
raise test_support.TestFailed(
"bad data <<%s>> (%d) received; expected <<%s>> (%d)\n"
- % (repr(outdata[:min(len(outdata),20)]), len(outdata),
- repr(indata[:min(len(indata),20)].lower()), len(indata)))
- s.write("over\n".encode("ASCII", "strict"))
+ % (outdata[:min(len(outdata),20)], len(outdata),
+ indata[:min(len(indata),20)].lower(), len(indata)))
+ s.write("over\n")
if connectionchatty:
if test_support.verbose:
sys.stdout.write(" client: closing connection.\n")
@@ -703,7 +697,44 @@ else:
class ThreadedTests(unittest.TestCase):
- def testEcho (self):
+ def testRudeShutdown(self):
+
+ listener_ready = threading.Event()
+ listener_gone = threading.Event()
+ port = test_support.find_unused_port()
+
+ # `listener` runs in a thread. It opens a socket listening on
+ # PORT, and sits in an accept() until the main thread connects.
+ # Then it rudely closes the socket, and sets Event `listener_gone`
+ # to let the main thread know the socket is gone.
+ def listener():
+ s = socket.socket()
+ s.bind((HOST, port))
+ s.listen(5)
+ listener_ready.set()
+ s.accept()
+ s = None # reclaim the socket object, which also closes it
+ listener_gone.set()
+
+ def connector():
+ listener_ready.wait()
+ s = socket.socket()
+ s.connect((HOST, port))
+ listener_gone.wait()
+ try:
+ ssl_sock = ssl.wrap_socket(s)
+ except IOError:
+ pass
+ else:
+ raise test_support.TestFailed(
+ 'connecting to closed SSL socket should have failed')
+
+ t = threading.Thread(target=listener)
+ t.start()
+ connector()
+ t.join()
+
+ def testEcho(self):
if test_support.verbose:
sys.stdout.write("\n")
@@ -716,7 +747,7 @@ else:
if test_support.verbose:
sys.stdout.write("\n")
s2 = socket.socket()
- server = ThreadedEchoServer(TESTPORT, CERTFILE,
+ server = ThreadedEchoServer(CERTFILE,
certreqs=ssl.CERT_NONE,
ssl_version=ssl.PROTOCOL_SSLv23,
cacerts=CERTFILE,
@@ -733,7 +764,7 @@ else:
ca_certs=CERTFILE,
cert_reqs=ssl.CERT_REQUIRED,
ssl_version=ssl.PROTOCOL_SSLv23)
- s.connect(('127.0.0.1', TESTPORT))
+ s.connect((HOST, server.port))
except ssl.SSLError as x:
raise test_support.TestFailed(
"Unexpected SSL error: " + str(x))
@@ -776,46 +807,6 @@ else:
badCertTest(os.path.join(os.path.dirname(__file__) or os.curdir,
"badkey.pem"))
- def testRudeShutdown(self):
-
- listener_ready = threading.Event()
- listener_gone = threading.Event()
-
- # `listener` runs in a thread. It opens a socket listening on
- # PORT, and sits in an accept() until the main thread connects.
- # Then it rudely closes the socket, and sets Event `listener_gone`
- # to let the main thread know the socket is gone.
- def listener():
- s = socket.socket()
- if hasattr(socket, 'SO_REUSEADDR'):
- s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- if hasattr(socket, 'SO_REUSEPORT'):
- s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
- s.bind(('127.0.0.1', TESTPORT))
- s.listen(5)
- listener_ready.set()
- s.accept()
- s = None # reclaim the socket object, which also closes it
- listener_gone.set()
-
- def connector():
- listener_ready.wait()
- s = socket.socket()
- s.connect(('127.0.0.1', TESTPORT))
- listener_gone.wait()
- try:
- ssl_sock = ssl.wrap_socket(s)
- except IOError:
- pass
- else:
- raise test_support.TestFailed(
- 'connecting to closed SSL socket should have failed')
-
- t = threading.Thread(target=listener)
- t.start()
- connector()
- t.join()
-
def testProtocolSSL2(self):
if test_support.verbose:
sys.stdout.write("\n")
@@ -873,7 +864,7 @@ else:
msgs = ("msg 1", "MSG 2", "STARTTLS", "MSG 3", "msg 4")
- server = ThreadedEchoServer(TESTPORT, CERTFILE,
+ server = ThreadedEchoServer(CERTFILE,
ssl_version=ssl.PROTOCOL_TLSv1,
starttls_server=True,
chatty=True,
@@ -888,7 +879,7 @@ else:
try:
s = socket.socket()
s.setblocking(1)
- s.connect(('127.0.0.1', TESTPORT))
+ s.connect((HOST, server.port))
except Exception as x:
raise test_support.TestFailed("Unexpected exception: " + str(x))
else:
@@ -936,7 +927,8 @@ else:
def testSocketServer(self):
- server = OurHTTPSServer(TESTPORT, CERTFILE)
+
+ server = AsyncoreHTTPSServer(CERTFILE)
flag = threading.Event()
server.start(flag)
# wait for it to start
@@ -948,8 +940,8 @@ else:
d1 = open(CERTFILE, 'rb').read()
d2 = ''
# now fetch the same data from the HTTPS server
- url = 'https://127.0.0.1:%d/%s' % (
- TESTPORT, os.path.split(CERTFILE)[1])
+ url = 'https://%s:%d/%s' % (
+ HOST, server.port, os.path.split(CERTFILE)[1])
f = urllib.urlopen(url)
dlen = f.info().getheader("content-length")
if dlen and (int(dlen) > 0):
@@ -978,71 +970,11 @@ else:
sys.stdout.write('joining thread\n')
server.join()
- def testAsyncoreServer(self):
-
- if test_support.verbose:
- sys.stdout.write("\n")
-
- indata="FOO\n"
- server = AsyncoreEchoServer(TESTPORT, CERTFILE)
- flag = threading.Event()
- server.start(flag)
- # wait for it to start
- flag.wait()
- # try to connect
- try:
- s = ssl.wrap_socket(socket.socket())
- s.connect(('127.0.0.1', TESTPORT))
- except ssl.SSLError as x:
- raise test_support.TestFailed("Unexpected SSL error: " + str(x))
- except Exception as x:
- raise test_support.TestFailed("Unexpected exception: " + str(x))
- else:
- if test_support.verbose:
- sys.stdout.write(
- " client: sending %s...\n" % (repr(indata)))
- s.sendall(indata.encode('ASCII', 'strict'))
- outdata = s.recv()
- if test_support.verbose:
- sys.stdout.write(" client: read %s\n" % repr(outdata))
- outdata = str(outdata, 'ASCII', 'strict')
- if outdata != indata.lower():
- raise test_support.TestFailed(
- "bad data <<%s>> (%d) received; expected <<%s>> (%d)\n"
- % (repr(outdata[:min(len(outdata),20)]), len(outdata),
- repr(indata[:min(len(indata),20)].lower()), len(indata)))
- s.write("over\n".encode("ASCII", "strict"))
- if test_support.verbose:
- sys.stdout.write(" client: closing connection.\n")
- s.close()
- finally:
- server.stop()
- server.join()
-
-
-def findtestsocket(start, end):
- def testbind(i):
- s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- try:
- s.bind(("127.0.0.1", i))
- except:
- return 0
- else:
- return 1
- finally:
- s.close()
-
- for i in range(start, end):
- if testbind(i) and testbind(i+1):
- return i
- return 0
-
-
def test_main(verbose=False):
if skip_expected:
raise test_support.TestSkipped("No SSL support")
- global CERTFILE, TESTPORT, SVN_PYTHON_ORG_ROOT_CERT
+ global CERTFILE, SVN_PYTHON_ORG_ROOT_CERT
CERTFILE = os.path.join(os.path.dirname(__file__) or os.curdir,
"keycert.pem")
SVN_PYTHON_ORG_ROOT_CERT = os.path.join(
@@ -1053,10 +985,6 @@ def test_main(verbose=False):
not os.path.exists(SVN_PYTHON_ORG_ROOT_CERT)):
raise test_support.TestFailed("Can't read certificate files!")
- TESTPORT = findtestsocket(10025, 12000)
- if not TESTPORT:
- raise test_support.TestFailed("Can't find open port to test servers on!")
-
tests = [BasicTests]
if test_support.is_resource_enabled('network'):
diff --git a/Lib/test/test_sundry.py b/Lib/test/test_sundry.py
index d878e7a..64a9690 100644
--- a/Lib/test/test_sundry.py
+++ b/Lib/test/test_sundry.py
@@ -1,111 +1,119 @@
"""Do a minimal test of all the modules that aren't otherwise tested."""
-from test.test_support import catch_warning
+from test import test_support
import sys
+import unittest
import warnings
-with catch_warning():
- from test.test_support import verbose
+class TestUntestedModules(unittest.TestCase):
+ def test_at_least_import_untested_modules(self):
+ with test_support.catch_warning():
+ import BaseHTTPServer
+ import DocXMLRPCServer
+ import CGIHTTPServer
+ import SimpleHTTPServer
+ import SimpleXMLRPCServer
+ import aifc
+ import bdb
+ import cgitb
+ import cmd
+ import code
+ import compileall
- import BaseHTTPServer
- import DocXMLRPCServer
- import CGIHTTPServer
- import SimpleHTTPServer
- import SimpleXMLRPCServer
- import aifc
- import bdb
- import cgitb
- import cmd
- import code
- import compileall
+ import distutils.archive_util
+ import distutils.bcppcompiler
+ import distutils.ccompiler
+ import distutils.cmd
+ import distutils.core
+ import distutils.cygwinccompiler
+ import distutils.dep_util
+ import distutils.dir_util
+ import distutils.emxccompiler
+ import distutils.errors
+ import distutils.extension
+ import distutils.file_util
+ import distutils.filelist
+ import distutils.log
+ if sys.platform.startswith('win'):
+ import distutils.msvccompiler
+ import distutils.mwerkscompiler
+ import distutils.sysconfig
+ import distutils.text_file
+ import distutils.unixccompiler
+ import distutils.util
+ import distutils.version
- import distutils.archive_util
- import distutils.bcppcompiler
- import distutils.ccompiler
- import distutils.cmd
- import distutils.core
- import distutils.cygwinccompiler
- import distutils.dep_util
- import distutils.dir_util
- import distutils.emxccompiler
- import distutils.errors
- import distutils.extension
- import distutils.file_util
- import distutils.filelist
- import distutils.log
- if sys.platform.startswith('win'):
- import distutils.msvccompiler
- import distutils.mwerkscompiler
- import distutils.sysconfig
- import distutils.text_file
- import distutils.unixccompiler
- import distutils.util
- import distutils.version
+ import distutils.command.bdist_dumb
+ if sys.platform.startswith('win'):
+ import distutils.command.bdist_msi
+ import distutils.command.bdist
+ import distutils.command.bdist_rpm
+ import distutils.command.bdist_wininst
+ import distutils.command.build_clib
+ import distutils.command.build_ext
+ import distutils.command.build
+ import distutils.command.build_py
+ import distutils.command.build_scripts
+ import distutils.command.clean
+ import distutils.command.config
+ import distutils.command.install_data
+ import distutils.command.install_egg_info
+ import distutils.command.install_headers
+ import distutils.command.install_lib
+ import distutils.command.install
+ import distutils.command.install_scripts
+ import distutils.command.register
+ import distutils.command.sdist
+ import distutils.command.upload
- import distutils.command.bdist_dumb
- if sys.platform.startswith('win'):
- import distutils.command.bdist_msi
- import distutils.command.bdist
- import distutils.command.bdist_rpm
- import distutils.command.bdist_wininst
- import distutils.command.build_clib
- import distutils.command.build_ext
- import distutils.command.build
- import distutils.command.build_py
- import distutils.command.build_scripts
- import distutils.command.clean
- import distutils.command.config
- import distutils.command.install_data
- import distutils.command.install_egg_info
- import distutils.command.install_headers
- import distutils.command.install_lib
- import distutils.command.install
- import distutils.command.install_scripts
- import distutils.command.register
- import distutils.command.sdist
- import distutils.command.upload
+ import encodings
+ import formatter
+ import ftplib
+ import getpass
+ import htmlentitydefs
+ import ihooks
+ import imghdr
+ import imputil
+ import keyword
+ import linecache
+ import macurl2path
+ import mailcap
+ import mutex
+ import nntplib
+ import nturl2path
+ import opcode
+ import os2emxpath
+ import pdb
+ import pstats
+ import py_compile
+ import pydoc
+ import rlcompleter
+ import sched
+ import smtplib
+ import sndhdr
+ import statvfs
+ import sunau
+ import sunaudio
+ import symbol
+ import tabnanny
+ import telnetlib
+ import timeit
+ import token
+ try:
+ import tty # not available on Windows
+ except ImportError:
+ if test_support.verbose:
+ print("skipping tty")
- import encodings
- import formatter
- import ftplib
- import getpass
- import htmlentitydefs
- import ihooks
- import imghdr
- import imputil
- import keyword
- import linecache
- import macurl2path
- import mailcap
- import mutex
- import nntplib
- import nturl2path
- import opcode
- import os2emxpath
- import pdb
- import pstats
- import py_compile
- import pydoc
- import rlcompleter
- import sched
- import smtplib
- import sndhdr
- import statvfs
- import sunau
- import sunaudio
- import symbol
- import tabnanny
- import telnetlib
- import timeit
- import token
- try:
- import tty # not available on Windows
- except ImportError:
- if verbose:
- print("skipping tty")
+ # Can't test the "user" module -- if the user has a ~/.pythonrc.py, it
+ # can screw up all sorts of things (esp. if it prints!).
+ #import user
+ import webbrowser
+ import xml
- # Can't test the "user" module -- if the user has a ~/.pythonrc.py, it
- # can screw up all sorts of things (esp. if it prints!).
- #import user
- import webbrowser
- import xml
+
+def test_main():
+ test_support.run_unittest(TestUntestedModules)
+
+if __name__ == "__main__":
+ test_main()
diff --git a/Lib/test/test_support.py b/Lib/test/test_support.py
index d2be9bf..b60f98c 100644
--- a/Lib/test/test_support.py
+++ b/Lib/test/test_support.py
@@ -103,32 +103,100 @@ def requires(resource, msg=None):
msg = "Use of the `%s' resource not enabled" % resource
raise ResourceDenied(msg)
-def bind_port(sock, host='', preferred_port=54321):
- """Try to bind the sock to a port. If we are running multiple
- tests and we don't try multiple ports, the test can fail. This
- makes the test more robust."""
-
- # Find some random ports that hopefully no one is listening on.
- # Ideally each test would clean up after itself and not continue listening
- # on any ports. However, this isn't the case. The last port (0) is
- # a stop-gap that asks the O/S to assign a port. Whenever the warning
- # message below is printed, the test that is listening on the port should
- # be fixed to close the socket at the end of the test.
- # Another reason why we can't use a port is another process (possibly
- # another instance of the test suite) is using the same port.
- for port in [preferred_port, 9907, 10243, 32999, 0]:
- try:
- sock.bind((host, port))
- if port == 0:
- port = sock.getsockname()[1]
- return port
- except socket.error as e:
- (err, msg) = e.args
- if err != errno.EADDRINUSE:
- raise
- print(' WARNING: failed to listen on port %d, ' % port +
- 'trying another', file=sys.__stderr__)
- raise TestFailed('unable to find port to listen on')
+HOST = 'localhost'
+
+def find_unused_port(family=socket.AF_INET, socktype=socket.SOCK_STREAM):
+ """Returns an unused port that should be suitable for binding. This is
+ achieved by creating a temporary socket with the same family and type as
+ the 'sock' parameter (default is AF_INET, SOCK_STREAM), and binding it to
+ the specified host address (defaults to 0.0.0.0) with the port set to 0,
+ eliciting an unused ephemeral port from the OS. The temporary socket is
+ then closed and deleted, and the ephemeral port is returned.
+
+ Either this method or bind_port() should be used for any tests where a
+ server socket needs to be bound to a particular port for the duration of
+ the test. Which one to use depends on whether the calling code is creating
+ a python socket, or if an unused port needs to be provided in a constructor
+ or passed to an external program (i.e. the -accept argument to openssl's
+ s_server mode). Always prefer bind_port() over find_unused_port() where
+ possible. Hard coded ports should *NEVER* be used. As soon as a server
+ socket is bound to a hard coded port, the ability to run multiple instances
+ of the test simultaneously on the same host is compromised, which makes the
+ test a ticking time bomb in a buildbot environment. On Unix buildbots, this
+ may simply manifest as a failed test, which can be recovered from without
+ intervention in most cases, but on Windows, the entire python process can
+ completely and utterly wedge, requiring someone to log in to the buildbot
+ and manually kill the affected process.
+
+ (This is easy to reproduce on Windows, unfortunately, and can be traced to
+ the SO_REUSEADDR socket option having different semantics on Windows versus
+ Unix/Linux. On Unix, you can't have two AF_INET SOCK_STREAM sockets bind,
+ listen and then accept connections on identical host/ports. An EADDRINUSE
+ socket.error will be raised at some point (depending on the platform and
+ the order bind and listen were called on each socket).
+
+ However, on Windows, if SO_REUSEADDR is set on the sockets, no EADDRINUSE
+ will ever be raised when attempting to bind two identical host/ports. When
+ accept() is called on each socket, the second caller's process will steal
+ the port from the first caller, leaving them both in an awkwardly wedged
+ state where they'll no longer respond to any signals or graceful kills, and
+ must be forcibly killed via OpenProcess()/TerminateProcess().
+
+ The solution on Windows is to use the SO_EXCLUSIVEADDRUSE socket option
+ instead of SO_REUSEADDR, which effectively affords the same semantics as
+ SO_REUSEADDR on Unix. Given the propensity of Unix developers in the Open
+ Source world compared to Windows ones, this is a common mistake. A quick
+ look over OpenSSL's 0.9.8g source shows that they use SO_REUSEADDR when
+ openssl.exe is called with the 's_server' option, for example. See
+ http://bugs.python.org/issue2550 for more info. The following site also
+ has a very thorough description about the implications of both REUSEADDR
+ and EXCLUSIVEADDRUSE on Windows:
+ http://msdn2.microsoft.com/en-us/library/ms740621(VS.85).aspx)
+
+ XXX: although this approach is a vast improvement on previous attempts to
+ elicit unused ports, it rests heavily on the assumption that the ephemeral
+ port returned to us by the OS won't immediately be dished back out to some
+ other process when we close and delete our temporary socket but before our
+ calling code has a chance to bind the returned port. We can deal with this
+ issue if/when we come across it.
+ """
+
+ tempsock = socket.socket(family, socktype)
+ port = bind_port(tempsock)
+ tempsock.close()
+ del tempsock
+ return port
+
+def bind_port(sock, host=HOST):
+ """Bind the socket to a free port and return the port number. Relies on
+ ephemeral ports in order to ensure we are using an unbound port. This is
+ important as many tests may be running simultaneously, especially in a
+ buildbot environment. This method raises an exception if the sock.family
+ is AF_INET and sock.type is SOCK_STREAM, *and* the socket has SO_REUSEADDR
+ or SO_REUSEPORT set on it. Tests should *never* set these socket options
+ for TCP/IP sockets. The only case for setting these options is testing
+ multicasting via multiple UDP sockets.
+
+ Additionally, if the SO_EXCLUSIVEADDRUSE socket option is available (i.e.
+ on Windows), it will be set on the socket. This will prevent anyone else
+ from bind()'ing to our host/port for the duration of the test.
+ """
+
+ if sock.family == socket.AF_INET and sock.type == socket.SOCK_STREAM:
+ if hasattr(socket, 'SO_REUSEADDR'):
+ if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) == 1:
+ raise TestFailed("tests should never set the SO_REUSEADDR " \
+ "socket option on TCP/IP sockets!")
+ if hasattr(socket, 'SO_REUSEPORT'):
+ if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) == 1:
+ raise TestFailed("tests should never set the SO_REUSEPORT " \
+ "socket option on TCP/IP sockets!")
+ if hasattr(socket, 'SO_EXCLUSIVEADDRUSE'):
+ sock.setsockopt(socket.SOL_SOCKET, socket.SO_EXCLUSIVEADDRUSE, 1)
+
+ sock.bind((host, 0))
+ port = sock.getsockname()[1]
+ return port
FUZZ = 1e-6
diff --git a/Lib/test/test_telnetlib.py b/Lib/test/test_telnetlib.py
index 8eee666..e4ee1b5 100644
--- a/Lib/test/test_telnetlib.py
+++ b/Lib/test/test_telnetlib.py
@@ -6,14 +6,9 @@ import time
from unittest import TestCase
from test import test_support
-PORT = 9091
+HOST = test_support.HOST
-def server(evt):
- serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- serv.settimeout(3)
- serv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- global PORT
- PORT = test_support.bind_port(serv, "", PORT)
+def server(evt, serv):
serv.listen(5)
evt.set()
try:
@@ -28,7 +23,10 @@ class GeneralTests(TestCase):
def setUp(self):
self.evt = threading.Event()
- threading.Thread(target=server, args=(self.evt,)).start()
+ self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ self.sock.settimeout(3)
+ self.port = test_support.bind_port(self.sock)
+ threading.Thread(target=server, args=(self.evt,self.sock)).start()
self.evt.wait()
self.evt.clear()
time.sleep(.1)
@@ -38,24 +36,24 @@ class GeneralTests(TestCase):
def testBasic(self):
# connects
- telnet = telnetlib.Telnet("localhost", PORT)
+ telnet = telnetlib.Telnet(HOST, self.port)
telnet.sock.close()
def testTimeoutDefault(self):
# default
- telnet = telnetlib.Telnet("localhost", PORT)
+ telnet = telnetlib.Telnet(HOST, self.port)
self.assertTrue(telnet.sock.gettimeout() is None)
telnet.sock.close()
def testTimeoutValue(self):
# a value
- telnet = telnetlib.Telnet("localhost", PORT, timeout=30)
+ telnet = telnetlib.Telnet(HOST, self.port, timeout=30)
self.assertEqual(telnet.sock.gettimeout(), 30)
telnet.sock.close()
def testTimeoutDifferentOrder(self):
telnet = telnetlib.Telnet(timeout=30)
- telnet.open("localhost", PORT)
+ telnet.open(HOST, self.port)
self.assertEqual(telnet.sock.gettimeout(), 30)
telnet.sock.close()
@@ -64,7 +62,7 @@ class GeneralTests(TestCase):
previous = socket.getdefaulttimeout()
socket.setdefaulttimeout(30)
try:
- telnet = telnetlib.Telnet("localhost", PORT, timeout=None)
+ telnet = telnetlib.Telnet(HOST, self.port, timeout=None)
finally:
socket.setdefaulttimeout(previous)
self.assertEqual(telnet.sock.gettimeout(), 30)
diff --git a/Lib/test/test_zlib.py b/Lib/test/test_zlib.py
index 65e633b..4fa263d 100644
--- a/Lib/test/test_zlib.py
+++ b/Lib/test/test_zlib.py
@@ -75,6 +75,11 @@ class ExceptionTestCase(unittest.TestCase):
# verify failure on building decompress object with bad params
self.assertRaises(ValueError, zlib.decompressobj, 0)
+ def test_decompressobj_badflush(self):
+ # verify failure on calling decompressobj.flush with bad params
+ self.assertRaises(ValueError, zlib.decompressobj().flush, 0)
+ self.assertRaises(ValueError, zlib.decompressobj().flush, -1)
+
class CompressTestCase(unittest.TestCase):
diff --git a/Lib/types.py b/Lib/types.py
index 65b70f7..ab354d1 100644
--- a/Lib/types.py
+++ b/Lib/types.py
@@ -34,16 +34,8 @@ except TypeError:
FrameType = type(tb.tb_frame)
tb = None; del tb
-# Extension types defined in a C helper module. XXX There may be no
-# equivalent in implementations other than CPython, so it seems better to
-# leave them undefined then to set them to e.g. None.
-try:
- import _types
-except ImportError:
- pass
-else:
- GetSetDescriptorType = type(_types.Helper.getter)
- MemberDescriptorType = type(_types.Helper.member)
- del _types
+# For Jython, the following two types are identical
+GetSetDescriptorType = type(FunctionType.__code__)
+MemberDescriptorType = type(FunctionType.__globals__)
del sys, _f, _g, _C, # Not for export