summaryrefslogtreecommitdiffstats
path: root/Lib/test
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test')
-rw-r--r--Lib/test/test_asynchat.py37
-rw-r--r--Lib/test/test_asyncore.py30
-rw-r--r--Lib/test/test_capi.py5
-rw-r--r--Lib/test/test_ftplib.py31
-rw-r--r--Lib/test/test_httplib.py22
-rw-r--r--Lib/test/test_poplib.py24
-rw-r--r--Lib/test/test_signal.py19
-rw-r--r--Lib/test/test_smtplib.py78
-rw-r--r--Lib/test/test_socket.py65
-rw-r--r--Lib/test/test_socketserver.py2
-rw-r--r--Lib/test/test_ssl.py194
-rw-r--r--Lib/test/test_sundry.py210
-rw-r--r--Lib/test/test_support.py120
-rw-r--r--Lib/test/test_telnetlib.py24
-rw-r--r--Lib/test/test_zlib.py5
15 files changed, 435 insertions, 431 deletions
diff --git a/Lib/test/test_asynchat.py b/Lib/test/test_asynchat.py
index 2912057..d24e8cc 100644
--- a/Lib/test/test_asynchat.py
+++ b/Lib/test/test_asynchat.py
@@ -6,8 +6,7 @@ import unittest
import sys
from test import test_support
-HOST = "127.0.0.1"
-PORT = 54322
+HOST = test_support.HOST
SERVER_QUIT = b'QUIT\n'
class echo_server(threading.Thread):
@@ -18,15 +17,13 @@ class echo_server(threading.Thread):
def __init__(self, event):
threading.Thread.__init__(self)
self.event = event
+ self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ self.port = test_support.bind_port(self.sock)
def run(self):
- sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- global PORT
- PORT = test_support.bind_port(sock, HOST, PORT)
- sock.listen(1)
+ self.sock.listen(1)
self.event.set()
- conn, client = sock.accept()
+ conn, client = self.sock.accept()
self.buffer = b""
# collect data until quit message is seen
while SERVER_QUIT not in self.buffer:
@@ -50,15 +47,15 @@ class echo_server(threading.Thread):
pass
conn.close()
- sock.close()
+ self.sock.close()
class echo_client(asynchat.async_chat):
- def __init__(self, terminator):
+ def __init__(self, terminator, server_port):
asynchat.async_chat.__init__(self)
self.contents = []
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
- self.connect((HOST, PORT))
+ self.connect((HOST, server_port))
self.set_terminator(terminator)
self.buffer = b""
@@ -106,7 +103,7 @@ class TestAsynchat(unittest.TestCase):
event.wait()
event.clear()
time.sleep(0.01) # Give server time to start accepting.
- c = echo_client(term)
+ c = echo_client(term, s.port)
c.push(b"hello ")
c.push(bytes("world%s" % term, "ascii"))
c.push(bytes("I'm not dead yet!%s" % term, "ascii"))
@@ -138,7 +135,7 @@ class TestAsynchat(unittest.TestCase):
def numeric_terminator_check(self, termlen):
# Try reading a fixed number of bytes
s, event = start_echo_server()
- c = echo_client(termlen)
+ c = echo_client(termlen, s.port)
data = b"hello world, I'm not dead yet!\n"
c.push(data)
c.push(SERVER_QUIT)
@@ -158,7 +155,7 @@ class TestAsynchat(unittest.TestCase):
def test_none_terminator(self):
# Try reading a fixed number of bytes
s, event = start_echo_server()
- c = echo_client(None)
+ c = echo_client(None, s.port)
data = b"hello world, I'm not dead yet!\n"
c.push(data)
c.push(SERVER_QUIT)
@@ -170,7 +167,7 @@ class TestAsynchat(unittest.TestCase):
def test_simple_producer(self):
s, event = start_echo_server()
- c = echo_client(b'\n')
+ c = echo_client(b'\n', s.port)
data = b"hello world\nI'm not dead yet!\n"
p = asynchat.simple_producer(data+SERVER_QUIT, buffer_size=8)
c.push_with_producer(p)
@@ -181,7 +178,7 @@ class TestAsynchat(unittest.TestCase):
def test_string_producer(self):
s, event = start_echo_server()
- c = echo_client(b'\n')
+ c = echo_client(b'\n', s.port)
data = b"hello world\nI'm not dead yet!\n"
c.push_with_producer(data+SERVER_QUIT)
asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
@@ -192,8 +189,8 @@ class TestAsynchat(unittest.TestCase):
def test_empty_line(self):
# checks that empty lines are handled correctly
s, event = start_echo_server()
- c = echo_client(b'\n')
- c.push(b"hello world\n\nI'm not dead yet!\n")
+ c = echo_client(b'\n', s.port)
+ c.push("hello world\n\nI'm not dead yet!\n")
c.push(SERVER_QUIT)
asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
s.join()
@@ -203,8 +200,8 @@ class TestAsynchat(unittest.TestCase):
def test_close_when_done(self):
s, event = start_echo_server()
- c = echo_client(b'\n')
- c.push(b"hello world\nI'm not dead yet!\n")
+ c = echo_client(b'\n', s.port)
+ c.push("hello world\nI'm not dead yet!\n")
c.push(SERVER_QUIT)
c.close_when_done()
asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
diff --git a/Lib/test/test_asyncore.py b/Lib/test/test_asyncore.py
index 6dc73ad..09401bd 100644
--- a/Lib/test/test_asyncore.py
+++ b/Lib/test/test_asyncore.py
@@ -9,10 +9,10 @@ import time
from test import test_support
from test.test_support import TESTFN, run_unittest, unlink
-from io import StringIO, BytesIO
+from io import BytesIO
+from io import StringIO
-HOST = "127.0.0.1"
-PORT = None
+HOST = test_support.HOST
class dummysocket:
def __init__(self):
@@ -52,14 +52,8 @@ class crashingdummy:
self.error_handled = True
# used when testing senders; just collects what it gets until newline is sent
-def capture_server(evt, buf):
+def capture_server(evt, buf, serv):
try:
- serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- serv.settimeout(3)
- serv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- serv.bind(("", 0))
- global PORT
- PORT = serv.getsockname()[1]
serv.listen(5)
conn, addr = serv.accept()
except socket.timeout:
@@ -80,7 +74,6 @@ def capture_server(evt, buf):
conn.close()
finally:
serv.close()
- PORT = None
evt.set()
@@ -339,14 +332,13 @@ class DispatcherWithSendTests(unittest.TestCase):
def test_send(self):
self.evt = threading.Event()
- cap = BytesIO()
- threading.Thread(target=capture_server, args=(self.evt, cap)).start()
+ self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ self.sock.settimeout(3)
+ self.port = test_support.bind_port(self.sock)
- # wait until server thread has assigned a port number
- n = 1000
- while PORT is None and n > 0:
- time.sleep(0.01)
- n -= 1
+ cap = BytesIO()
+ args = (self.evt, cap, self.sock)
+ threading.Thread(target=capture_server, args=args).start()
# wait a little longer for the server to initialize (it sometimes
# refuses connections on slow machines without this wait)
@@ -355,7 +347,7 @@ class DispatcherWithSendTests(unittest.TestCase):
data = b"Suppose there isn't a 16-ton weight?"
d = dispatcherwithsend_noread()
d.create_socket(socket.AF_INET, socket.SOCK_STREAM)
- d.connect((HOST, PORT))
+ d.connect((HOST, self.port))
# give time for socket to connect
time.sleep(0.1)
diff --git a/Lib/test/test_capi.py b/Lib/test/test_capi.py
index 107d4b4..87be9ee 100644
--- a/Lib/test/test_capi.py
+++ b/Lib/test/test_capi.py
@@ -16,9 +16,6 @@ def test_main():
# some extra thread-state tests driven via _testcapi
def TestThreadState():
- import thread
- import time
-
if test_support.verbose:
print("auto-thread-state")
@@ -42,6 +39,8 @@ def test_main():
have_thread_state = False
if have_thread_state:
+ import thread
+ import time
TestThreadState()
import threading
t=threading.Thread(target=TestThreadState)
diff --git a/Lib/test/test_ftplib.py b/Lib/test/test_ftplib.py
index 1ff8d08..0204c6f 100644
--- a/Lib/test/test_ftplib.py
+++ b/Lib/test/test_ftplib.py
@@ -6,18 +6,13 @@ import time
from unittest import TestCase
from test import test_support
-server_port = None
+HOST = test_support.HOST
# This function sets the evt 3 times:
# 1) when the connection is ready to be accepted.
# 2) when it is safe for the caller to close the connection
# 3) when we have closed the socket
-def server(evt):
- global server_port
- serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- serv.settimeout(3)
- serv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- server_port = test_support.bind_port(serv, "", 9091)
+def server(evt, serv):
serv.listen(5)
# (1) Signal the caller that we are ready to accept the connection.
@@ -40,14 +35,16 @@ class GeneralTests(TestCase):
def setUp(self):
self.evt = threading.Event()
- threading.Thread(target=server, args=(self.evt,)).start()
+ self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ self.sock.settimeout(3)
+ self.port = test_support.bind_port(self.sock)
+ threading.Thread(target=server, args=(self.evt,self.sock)).start()
# Wait for the server to be ready.
self.evt.wait()
self.evt.clear()
- ftplib.FTP.port = server_port
+ ftplib.FTP.port = self.port
def tearDown(self):
- # Wait on the closing of the socket (this shouldn't be necessary).
self.evt.wait()
def testBasic(self):
@@ -55,34 +52,34 @@ class GeneralTests(TestCase):
ftplib.FTP()
# connects
- ftp = ftplib.FTP("localhost")
+ ftp = ftplib.FTP(HOST)
self.evt.wait()
ftp.sock.close()
def testTimeoutDefault(self):
# default
- ftp = ftplib.FTP("localhost")
+ ftp = ftplib.FTP(HOST)
self.assertTrue(ftp.sock.gettimeout() is None)
self.evt.wait()
ftp.sock.close()
def testTimeoutValue(self):
# a value
- ftp = ftplib.FTP("localhost", timeout=30)
+ ftp = ftplib.FTP(HOST, timeout=30)
self.assertEqual(ftp.sock.gettimeout(), 30)
self.evt.wait()
ftp.sock.close()
def testTimeoutConnect(self):
ftp = ftplib.FTP()
- ftp.connect("localhost", timeout=30)
+ ftp.connect(HOST, timeout=30)
self.assertEqual(ftp.sock.gettimeout(), 30)
self.evt.wait()
ftp.sock.close()
def testTimeoutDifferentOrder(self):
ftp = ftplib.FTP(timeout=30)
- ftp.connect("localhost")
+ ftp.connect(HOST)
self.assertEqual(ftp.sock.gettimeout(), 30)
self.evt.wait()
ftp.sock.close()
@@ -90,7 +87,7 @@ class GeneralTests(TestCase):
def testTimeoutDirectAccess(self):
ftp = ftplib.FTP()
ftp.timeout = 30
- ftp.connect("localhost")
+ ftp.connect(HOST)
self.assertEqual(ftp.sock.gettimeout(), 30)
self.evt.wait()
ftp.sock.close()
@@ -100,7 +97,7 @@ class GeneralTests(TestCase):
previous = socket.getdefaulttimeout()
socket.setdefaulttimeout(30)
try:
- ftp = ftplib.FTP("localhost", timeout=None)
+ ftp = ftplib.FTP(HOST, timeout=None)
finally:
socket.setdefaulttimeout(previous)
self.assertEqual(ftp.sock.gettimeout(), 30)
diff --git a/Lib/test/test_httplib.py b/Lib/test/test_httplib.py
index ca801da..f0e551f 100644
--- a/Lib/test/test_httplib.py
+++ b/Lib/test/test_httplib.py
@@ -6,6 +6,8 @@ from unittest import TestCase
from test import test_support
+HOST = test_support.HOST
+
class FakeSocket:
def __init__(self, text, fileclass=io.BytesIO):
if isinstance(text, str):
@@ -199,16 +201,12 @@ class OfflineTest(TestCase):
def test_responses(self):
self.assertEquals(httplib.responses[httplib.NOT_FOUND], "Not Found")
-PORT = 50003
-HOST = "localhost"
-
class TimeoutTest(TestCase):
+ PORT = None
def setUp(self):
self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- self.serv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- global PORT
- PORT = test_support.bind_port(self.serv, HOST, PORT)
+ TimeoutTest.PORT = test_support.bind_port(self.serv)
self.serv.listen(5)
def tearDown(self):
@@ -220,13 +218,13 @@ class TimeoutTest(TestCase):
# and into the socket.
# default
- httpConn = httplib.HTTPConnection(HOST, PORT)
+ httpConn = httplib.HTTPConnection(HOST, TimeoutTest.PORT)
httpConn.connect()
self.assertTrue(httpConn.sock.gettimeout() is None)
httpConn.close()
# a value
- httpConn = httplib.HTTPConnection(HOST, PORT, timeout=30)
+ httpConn = httplib.HTTPConnection(HOST, TimeoutTest.PORT, timeout=30)
httpConn.connect()
self.assertEqual(httpConn.sock.gettimeout(), 30)
httpConn.close()
@@ -235,7 +233,8 @@ class TimeoutTest(TestCase):
previous = socket.getdefaulttimeout()
socket.setdefaulttimeout(30)
try:
- httpConn = httplib.HTTPConnection(HOST, PORT, timeout=None)
+ httpConn = httplib.HTTPConnection(HOST, TimeoutTest.PORT,
+ timeout=None)
httpConn.connect()
finally:
socket.setdefaulttimeout(previous)
@@ -249,11 +248,12 @@ class HTTPSTimeoutTest(TestCase):
def test_attributes(self):
# simple test to check it's storing it
if hasattr(httplib, 'HTTPSConnection'):
- h = httplib.HTTPSConnection(HOST, PORT, timeout=30)
+ h = httplib.HTTPSConnection(HOST, TimeoutTest.PORT, timeout=30)
self.assertEqual(h.timeout, 30)
def test_main(verbose=None):
- test_support.run_unittest(HeaderTests, OfflineTest, BasicTest, TimeoutTest, HTTPSTimeoutTest)
+ test_support.run_unittest(HeaderTests, OfflineTest, BasicTest, TimeoutTest,
+ HTTPSTimeoutTest)
if __name__ == '__main__':
test_main()
diff --git a/Lib/test/test_poplib.py b/Lib/test/test_poplib.py
index 983cf21..cd550df 100644
--- a/Lib/test/test_poplib.py
+++ b/Lib/test/test_poplib.py
@@ -6,14 +6,10 @@ import time
from unittest import TestCase
from test import test_support
+HOST = test_support.HOST
-def server(ready, evt):
- serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- serv.settimeout(3)
- serv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- serv.bind(("", 9091))
+def server(evt, serv):
serv.listen(5)
- ready.set()
try:
conn, addr = serv.accept()
except socket.timeout:
@@ -29,27 +25,29 @@ class GeneralTests(TestCase):
def setUp(self):
self.evt = threading.Event()
- self.ready = threading.Event()
- threading.Thread(target=server, args=(self.ready, self.evt,)).start()
- self.ready.wait()
+ self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ self.sock.settimeout(3)
+ self.port = test_support.bind_port(self.sock)
+ threading.Thread(target=server, args=(self.evt,self.sock)).start()
+ time.sleep(.1)
def tearDown(self):
self.evt.wait()
def testBasic(self):
# connects
- pop = poplib.POP3("localhost", 9091)
+ pop = poplib.POP3(HOST, self.port)
pop.sock.close()
def testTimeoutDefault(self):
# default
- pop = poplib.POP3("localhost", 9091)
+ pop = poplib.POP3(HOST, self.port)
self.assertTrue(pop.sock.gettimeout() is None)
pop.sock.close()
def testTimeoutValue(self):
# a value
- pop = poplib.POP3("localhost", 9091, timeout=30)
+ pop = poplib.POP3(HOST, self.port, timeout=30)
self.assertEqual(pop.sock.gettimeout(), 30)
pop.sock.close()
@@ -58,7 +56,7 @@ class GeneralTests(TestCase):
previous = socket.getdefaulttimeout()
socket.setdefaulttimeout(30)
try:
- pop = poplib.POP3("localhost", 9091, timeout=None)
+ pop = poplib.POP3(HOST, self.port, timeout=None)
finally:
socket.setdefaulttimeout(previous)
self.assertEqual(pop.sock.gettimeout(), 30)
diff --git a/Lib/test/test_signal.py b/Lib/test/test_signal.py
index a410710..339cec2 100644
--- a/Lib/test/test_signal.py
+++ b/Lib/test/test_signal.py
@@ -48,16 +48,21 @@ class InterProcessSignalTests(unittest.TestCase):
if self.using_gc:
gc.enable()
- def handlerA(self, *args):
+ def format_frame(self, frame, limit=None):
+ return ''.join(traceback.format_stack(frame, limit=limit))
+
+ def handlerA(self, signum, frame):
self.a_called = True
if test_support.verbose:
- print("handlerA invoked", args)
+ print("handlerA invoked from signal %s at:\n%s" % (
+ signum, self.format_frame(frame, limit=1)))
- def handlerB(self, *args):
+ def handlerB(self, signum, frame):
self.b_called = True
if test_support.verbose:
- print("handlerB invoked", args)
- raise HandlerBCalled(*args)
+ print ("handlerB invoked from signal %s at:\n%s" % (
+ signum, self.format_frame(frame, limit=1)))
+ raise HandlerBCalled(signum, self.format_frame(frame))
def wait(self, child):
"""Wait for child to finish, ignoring EINTR."""
@@ -95,6 +100,10 @@ class InterProcessSignalTests(unittest.TestCase):
self.assertFalse(self.b_called)
self.a_called = False
+ # Make sure the signal isn't delivered while the previous
+ # Popen object is being destroyed, because __del__ swallows
+ # exceptions.
+ del child
try:
child = subprocess.Popen(['kill', '-USR1', str(pid)])
# This wait should be interrupted by the signal's exception.
diff --git a/Lib/test/test_smtplib.py b/Lib/test/test_smtplib.py
index e1c198f5..9dc672d 100644
--- a/Lib/test/test_smtplib.py
+++ b/Lib/test/test_smtplib.py
@@ -12,18 +12,9 @@ import select
from unittest import TestCase
from test import test_support
-# PORT is used to communicate the port number assigned to the server
-# to the test client
-HOST = "localhost"
-PORT = None
-
-def server(evt, buf):
- serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- serv.settimeout(15)
- serv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- serv.bind(("", 0))
- global PORT
- PORT = serv.getsockname()[1]
+HOST = test_support.HOST
+
+def server(evt, buf, serv):
serv.listen(5)
evt.set()
try:
@@ -43,14 +34,16 @@ def server(evt, buf):
conn.close()
finally:
serv.close()
- PORT = None
evt.set()
class GeneralTests(TestCase):
def setUp(self):
self.evt = threading.Event()
- servargs = (self.evt, b"220 Hola mundo\n")
+ self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ self.sock.settimeout(15)
+ self.port = test_support.bind_port(self.sock)
+ servargs = (self.evt, b"220 Hola mundo\n", self.sock)
threading.Thread(target=server, args=servargs).start()
self.evt.wait()
self.evt.clear()
@@ -60,29 +53,29 @@ class GeneralTests(TestCase):
def testBasic1(self):
# connects
- smtp = smtplib.SMTP(HOST, PORT)
+ smtp = smtplib.SMTP(HOST, self.port)
smtp.sock.close()
def testBasic2(self):
# connects, include port in host name
- smtp = smtplib.SMTP("%s:%s" % (HOST, PORT))
+ smtp = smtplib.SMTP("%s:%s" % (HOST, self.port))
smtp.sock.close()
def testLocalHostName(self):
# check that supplied local_hostname is used
- smtp = smtplib.SMTP(HOST, PORT, local_hostname="testhost")
+ smtp = smtplib.SMTP(HOST, self.port, local_hostname="testhost")
self.assertEqual(smtp.local_hostname, "testhost")
smtp.sock.close()
def testTimeoutDefault(self):
# default
- smtp = smtplib.SMTP(HOST, PORT)
+ smtp = smtplib.SMTP(HOST, self.port)
self.assertTrue(smtp.sock.gettimeout() is None)
smtp.sock.close()
def testTimeoutValue(self):
# a value
- smtp = smtplib.SMTP(HOST, PORT, timeout=30)
+ smtp = smtplib.SMTP(HOST, self.port, timeout=30)
self.assertEqual(smtp.sock.gettimeout(), 30)
smtp.sock.close()
@@ -91,7 +84,7 @@ class GeneralTests(TestCase):
previous = socket.getdefaulttimeout()
socket.setdefaulttimeout(30)
try:
- smtp = smtplib.SMTP(HOST, PORT, timeout=None)
+ smtp = smtplib.SMTP(HOST, self.port, timeout=None)
finally:
socket.setdefaulttimeout(previous)
self.assertEqual(smtp.sock.gettimeout(), 30)
@@ -99,10 +92,7 @@ class GeneralTests(TestCase):
# Test server thread using the specified SMTP server class
-def debugging_server(server_class, serv_evt, client_evt):
- serv = server_class(("", 0), ('nowhere', -1))
- global PORT
- PORT = serv.getsockname()[1]
+def debugging_server(serv, serv_evt, client_evt):
serv_evt.set()
try:
@@ -131,7 +121,6 @@ def debugging_server(server_class, serv_evt, client_evt):
time.sleep(0.5)
serv.close()
asyncore.close_all()
- PORT = None
serv_evt.set()
MSG_BEGIN = '---------- MESSAGE FOLLOWS ----------\n'
@@ -153,7 +142,9 @@ class DebuggingServerTests(TestCase):
self.serv_evt = threading.Event()
self.client_evt = threading.Event()
- serv_args = (smtpd.DebuggingServer, self.serv_evt, self.client_evt)
+ self.port = test_support.find_unused_port()
+ self.serv = smtpd.DebuggingServer((HOST, self.port), ('nowhere', -1))
+ serv_args = (self.serv, self.serv_evt, self.client_evt)
threading.Thread(target=debugging_server, args=serv_args).start()
# wait until server thread has assigned a port number
@@ -170,31 +161,31 @@ class DebuggingServerTests(TestCase):
def testBasic(self):
# connect
- smtp = smtplib.SMTP(HOST, PORT, local_hostname='localhost', timeout=3)
+ smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
smtp.quit()
def testNOOP(self):
- smtp = smtplib.SMTP(HOST, PORT, local_hostname='localhost', timeout=3)
+ smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
expected = (250, b'Ok')
self.assertEqual(smtp.noop(), expected)
smtp.quit()
def testRSET(self):
- smtp = smtplib.SMTP(HOST, PORT, local_hostname='localhost', timeout=3)
+ smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
expected = (250, b'Ok')
self.assertEqual(smtp.rset(), expected)
smtp.quit()
def testNotImplemented(self):
# EHLO isn't implemented in DebuggingServer
- smtp = smtplib.SMTP(HOST, PORT, local_hostname='localhost', timeout=3)
+ smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
expected = (502, b'Error: command "EHLO" not implemented')
self.assertEqual(smtp.ehlo(), expected)
smtp.quit()
def testVRFY(self):
# VRFY isn't implemented in DebuggingServer
- smtp = smtplib.SMTP(HOST, PORT, local_hostname='localhost', timeout=3)
+ smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
expected = (502, b'Error: command "VRFY" not implemented')
self.assertEqual(smtp.vrfy('nobody@nowhere.com'), expected)
self.assertEqual(smtp.verify('nobody@nowhere.com'), expected)
@@ -203,21 +194,21 @@ class DebuggingServerTests(TestCase):
def testSecondHELO(self):
# check that a second HELO returns a message that it's a duplicate
# (this behavior is specific to smtpd.SMTPChannel)
- smtp = smtplib.SMTP(HOST, PORT, local_hostname='localhost', timeout=3)
+ smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
smtp.helo()
expected = (503, b'Duplicate HELO/EHLO')
self.assertEqual(smtp.helo(), expected)
smtp.quit()
def testHELP(self):
- smtp = smtplib.SMTP(HOST, PORT, local_hostname='localhost', timeout=3)
+ smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
self.assertEqual(smtp.help(), b'Error: command "HELP" not implemented')
smtp.quit()
def testSend(self):
# connect and send mail
m = 'A test message'
- smtp = smtplib.SMTP(HOST, PORT, local_hostname='localhost', timeout=3)
+ smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
smtp.sendmail('John', 'Sally', m)
smtp.quit()
@@ -257,7 +248,10 @@ class BadHELOServerTests(TestCase):
sys.stdout = self.output
self.evt = threading.Event()
- servargs = (self.evt, b"199 no hello for you!\n")
+ self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ self.sock.settimeout(15)
+ self.port = test_support.bind_port(self.sock)
+ servargs = (self.evt, b"199 no hello for you!\n", self.sock)
threading.Thread(target=server, args=servargs).start()
self.evt.wait()
self.evt.clear()
@@ -268,7 +262,7 @@ class BadHELOServerTests(TestCase):
def testFailingHELO(self):
self.assertRaises(smtplib.SMTPConnectError, smtplib.SMTP,
- HOST, PORT, 'localhost', 3)
+ HOST, self.port, 'localhost', 3)
sim_users = {'Mr.A@somewhere.com':'John A',
@@ -333,7 +327,9 @@ class SMTPSimTests(TestCase):
def setUp(self):
self.serv_evt = threading.Event()
self.client_evt = threading.Event()
- serv_args = (SimSMTPServer, self.serv_evt, self.client_evt)
+ self.port = test_support.find_unused_port()
+ self.serv = SimSMTPServer((HOST, self.port), ('nowhere', -1))
+ serv_args = (self.serv, self.serv_evt, self.client_evt)
threading.Thread(target=debugging_server, args=serv_args).start()
# wait until server thread has assigned a port number
@@ -348,11 +344,11 @@ class SMTPSimTests(TestCase):
def testBasic(self):
# smoke test
- smtp = smtplib.SMTP(HOST, PORT, local_hostname='localhost', timeout=15)
+ smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
smtp.quit()
def testEHLO(self):
- smtp = smtplib.SMTP(HOST, PORT, local_hostname='localhost', timeout=15)
+ smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
# no features should be present before the EHLO
self.assertEqual(smtp.esmtp_features, {})
@@ -373,7 +369,7 @@ class SMTPSimTests(TestCase):
smtp.quit()
def testVRFY(self):
- smtp = smtplib.SMTP(HOST, PORT, local_hostname='localhost', timeout=15)
+ smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
for email, name in sim_users.items():
expected_known = (250, bytes('%s %s' %
@@ -388,7 +384,7 @@ class SMTPSimTests(TestCase):
smtp.quit()
def testEXPN(self):
- smtp = smtplib.SMTP(HOST, PORT, local_hostname='localhost', timeout=15)
+ smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
for listname, members in sim_lists.items():
users = []
diff --git a/Lib/test/test_socket.py b/Lib/test/test_socket.py
index 2bec373..8c5cf93 100644
--- a/Lib/test/test_socket.py
+++ b/Lib/test/test_socket.py
@@ -3,6 +3,7 @@
import unittest
from test import test_support
+import errno
import socket
import select
import thread, threading
@@ -15,17 +16,14 @@ import array
from weakref import proxy
import signal
-PORT = 50007
-HOST = 'localhost'
+HOST = test_support.HOST
MSG = b'Michael Gilfix was here\n'
class SocketTCPTest(unittest.TestCase):
def setUp(self):
self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- self.serv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- global PORT
- PORT = test_support.bind_port(self.serv, HOST, PORT)
+ self.port = test_support.bind_port(self.serv)
self.serv.listen(1)
def tearDown(self):
@@ -36,9 +34,7 @@ class SocketUDPTest(unittest.TestCase):
def setUp(self):
self.serv = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
- self.serv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- global PORT
- PORT = test_support.bind_port(self.serv, HOST, PORT)
+ self.port = test_support.bind_port(self.serv)
def tearDown(self):
self.serv.close()
@@ -190,7 +186,7 @@ class SocketConnectedTest(ThreadedTCPSocketTest):
def clientSetUp(self):
ThreadedTCPSocketTest.clientSetUp(self)
- self.cli.connect((HOST, PORT))
+ self.cli.connect((HOST, self.port))
self.serv_conn = self.cli
def clientTearDown(self):
@@ -470,16 +466,23 @@ class GeneralModuleTests(unittest.TestCase):
# XXX The following don't test module-level functionality...
def testSockName(self):
- # Testing getsockname()
+ # Testing getsockname(). Use a temporary socket to elicit an unused
+ # ephemeral port that we can use later in the test.
+ tempsock = socket.socket()
+ tempsock.bind(("0.0.0.0", 0))
+ (host, port) = tempsock.getsockname()
+ tempsock.close()
+ del tempsock
+
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- sock.bind(("0.0.0.0", PORT+1))
+ sock.bind(("0.0.0.0", port))
name = sock.getsockname()
# XXX(nnorwitz): http://tinyurl.com/os5jz seems to indicate
# it reasonable to get the host's addr in addition to 0.0.0.0.
# At least for eCos. This is required for the S/390 to pass.
my_ip_addr = socket.gethostbyname(socket.gethostname())
self.assert_(name[0] in ("0.0.0.0", my_ip_addr), '%s invalid' % name[0])
- self.assertEqual(name[1], PORT+1)
+ self.assertEqual(name[1], port)
def testGetSockOpt(self):
# Testing getsockopt()
@@ -615,7 +618,7 @@ class BasicUDPTest(ThreadedUDPSocketTest):
self.assertEqual(msg, MSG)
def _testSendtoAndRecv(self):
- self.cli.sendto(MSG, 0, (HOST, PORT))
+ self.cli.sendto(MSG, 0, (HOST, self.port))
def testRecvFrom(self):
# Testing recvfrom() over UDP
@@ -623,14 +626,14 @@ class BasicUDPTest(ThreadedUDPSocketTest):
self.assertEqual(msg, MSG)
def _testRecvFrom(self):
- self.cli.sendto(MSG, 0, (HOST, PORT))
+ self.cli.sendto(MSG, 0, (HOST, self.port))
def testRecvFromNegative(self):
# Negative lengths passed to recvfrom should give ValueError.
self.assertRaises(ValueError, self.serv.recvfrom, -1)
def _testRecvFromNegative(self):
- self.cli.sendto(MSG, 0, (HOST, PORT))
+ self.cli.sendto(MSG, 0, (HOST, self.port))
class TCPCloserTest(ThreadedTCPSocketTest):
@@ -648,7 +651,7 @@ class TCPCloserTest(ThreadedTCPSocketTest):
conn.close()
def _testClose(self):
- self.cli.connect((HOST, PORT))
+ self.cli.connect((HOST, self.port))
time.sleep(1.0)
class BasicSocketPairTest(SocketPairTest):
@@ -706,7 +709,7 @@ class NonBlockingTCPTests(ThreadedTCPSocketTest):
def _testAccept(self):
time.sleep(0.1)
- self.cli.connect((HOST, PORT))
+ self.cli.connect((HOST, self.port))
def testConnect(self):
# Testing non-blocking connect
@@ -714,7 +717,7 @@ class NonBlockingTCPTests(ThreadedTCPSocketTest):
def _testConnect(self):
self.cli.settimeout(10)
- self.cli.connect((HOST, PORT))
+ self.cli.connect((HOST, self.port))
def testRecv(self):
# Testing non-blocking recv
@@ -734,7 +737,7 @@ class NonBlockingTCPTests(ThreadedTCPSocketTest):
self.fail("Error during select call to non-blocking socket.")
def _testRecv(self):
- self.cli.connect((HOST, PORT))
+ self.cli.connect((HOST, self.port))
time.sleep(0.1)
self.cli.send(MSG)
@@ -883,7 +886,9 @@ class NetworkConnectionTest(object):
"""Prove network connection."""
def clientSetUp(self):
- self.cli = socket.create_connection((HOST, PORT))
+ # We're inherited below by BasicTCPTest2, which also inherits
+ # BasicTCPTest, which defines self.port referenced below.
+ self.cli = socket.create_connection((HOST, self.port))
self.serv_conn = self.cli
class BasicTCPTest2(NetworkConnectionTest, BasicTCPTest):
@@ -893,7 +898,11 @@ class BasicTCPTest2(NetworkConnectionTest, BasicTCPTest):
class NetworkConnectionNoServer(unittest.TestCase):
def testWithoutServer(self):
- self.failUnlessRaises(socket.error, lambda: socket.create_connection((HOST, PORT)))
+ port = test_support.find_unused_port()
+ self.failUnlessRaises(
+ socket.error,
+ lambda: socket.create_connection((HOST, port))
+ )
class NetworkConnectionAttributesTest(SocketTCPTest, ThreadableTest):
@@ -914,22 +923,22 @@ class NetworkConnectionAttributesTest(SocketTCPTest, ThreadableTest):
testFamily = _justAccept
def _testFamily(self):
- self.cli = socket.create_connection((HOST, PORT), timeout=30)
+ self.cli = socket.create_connection((HOST, self.port), timeout=30)
self.assertEqual(self.cli.family, 2)
testTimeoutDefault = _justAccept
def _testTimeoutDefault(self):
- self.cli = socket.create_connection((HOST, PORT))
+ self.cli = socket.create_connection((HOST, self.port))
self.assertTrue(self.cli.gettimeout() is None)
testTimeoutValueNamed = _justAccept
def _testTimeoutValueNamed(self):
- self.cli = socket.create_connection((HOST, PORT), timeout=30)
+ self.cli = socket.create_connection((HOST, self.port), timeout=30)
self.assertEqual(self.cli.gettimeout(), 30)
testTimeoutValueNonamed = _justAccept
def _testTimeoutValueNonamed(self):
- self.cli = socket.create_connection((HOST, PORT), 30)
+ self.cli = socket.create_connection((HOST, self.port), 30)
self.assertEqual(self.cli.gettimeout(), 30)
testTimeoutNone = _justAccept
@@ -937,7 +946,7 @@ class NetworkConnectionAttributesTest(SocketTCPTest, ThreadableTest):
previous = socket.getdefaulttimeout()
socket.setdefaulttimeout(30)
try:
- self.cli = socket.create_connection((HOST, PORT), timeout=None)
+ self.cli = socket.create_connection((HOST, self.port), timeout=None)
finally:
socket.setdefaulttimeout(previous)
self.assertEqual(self.cli.gettimeout(), 30)
@@ -964,12 +973,12 @@ class NetworkConnectionBehaviourTest(SocketTCPTest, ThreadableTest):
testOutsideTimeout = testInsideTimeout
def _testInsideTimeout(self):
- self.cli = sock = socket.create_connection((HOST, PORT))
+ self.cli = sock = socket.create_connection((HOST, self.port))
data = sock.recv(5)
self.assertEqual(data, b"done!")
def _testOutsideTimeout(self):
- self.cli = sock = socket.create_connection((HOST, PORT), timeout=1)
+ self.cli = sock = socket.create_connection((HOST, self.port), timeout=1)
self.failUnlessRaises(socket.timeout, lambda: sock.recv(5))
diff --git a/Lib/test/test_socketserver.py b/Lib/test/test_socketserver.py
index 0656176..3946078 100644
--- a/Lib/test/test_socketserver.py
+++ b/Lib/test/test_socketserver.py
@@ -22,7 +22,7 @@ from test.test_support import TESTFN as TEST_FILE
test.test_support.requires("network")
TEST_STR = b"hello world\n"
-HOST = "localhost"
+HOST = test.test_support.HOST
HAVE_UNIX_SOCKETS = hasattr(socket, "AF_UNIX")
HAVE_FORKING = hasattr(os, "fork") and os.name != "os2"
diff --git a/Lib/test/test_ssl.py b/Lib/test/test_ssl.py
index 520f440..99ed00f 100644
--- a/Lib/test/test_ssl.py
+++ b/Lib/test/test_ssl.py
@@ -25,11 +25,10 @@ try:
except ImportError:
skip_expected = True
+HOST = test_support.HOST
CERTFILE = None
SVN_PYTHON_ORG_ROOT_CERT = None
-TESTPORT = 10025
-
def handle_error(prefix):
exc_format = ' '.join(traceback.format_exception(*sys.exc_info()))
if test_support.verbose:
@@ -299,7 +298,7 @@ else:
except:
handle_error('')
- def __init__(self, port, certificate, ssl_version=None,
+ def __init__(self, certificate, ssl_version=None,
certreqs=None, cacerts=None, expect_bad_connects=False,
chatty=True, connectionchatty=False, starttls_server=False):
if ssl_version is None:
@@ -315,12 +314,8 @@ else:
self.connectionchatty = connectionchatty
self.starttls_server = starttls_server
self.sock = socket.socket()
+ self.port = test_support.bind_port(self.sock)
self.flag = None
- if hasattr(socket, 'SO_REUSEADDR'):
- self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- if hasattr(socket, 'SO_REUSEPORT'):
- self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
- self.sock.bind(('127.0.0.1', port))
self.active = False
threading.Thread.__init__(self)
self.setDaemon(False)
@@ -471,12 +466,13 @@ else:
format%args))
- def __init__(self, port, certfile):
+ def __init__(self, certfile):
self.flag = None
self.active = False
self.RootedHTTPRequestHandler.root = os.path.split(CERTFILE)[0]
+ self.port = test_support.find_unused_port()
self.server = self.HTTPSServer(
- ('', port), self.RootedHTTPRequestHandler, certfile)
+ (HOST, self.port), self.RootedHTTPRequestHandler, certfile)
threading.Thread.__init__(self)
self.setDaemon(True)
@@ -586,7 +582,7 @@ else:
self.server.close()
def badCertTest (certfile):
- server = ThreadedEchoServer(TESTPORT, CERTFILE,
+ server = ThreadedEchoServer(CERTFILE,
certreqs=ssl.CERT_REQUIRED,
cacerts=CERTFILE, chatty=False,
connectionchatty=False)
@@ -600,7 +596,7 @@ else:
s = ssl.wrap_socket(socket.socket(),
certfile=certfile,
ssl_version=ssl.PROTOCOL_TLSv1)
- s.connect(('127.0.0.1', TESTPORT))
+ s.connect((HOST, server.port))
except ssl.SSLError as x:
if test_support.verbose:
sys.stdout.write("\nSSLError is %s\n" % x)
@@ -616,7 +612,7 @@ else:
indata="FOO\n",
chatty=False, connectionchatty=False):
- server = ThreadedEchoServer(TESTPORT, certfile,
+ server = ThreadedEchoServer(certfile,
certreqs=certreqs,
ssl_version=protocol,
cacerts=cacertsfile,
@@ -631,12 +627,11 @@ else:
client_protocol = protocol
try:
s = ssl.wrap_socket(socket.socket(),
- server_side=False,
certfile=client_certfile,
ca_certs=cacertsfile,
cert_reqs=certreqs,
ssl_version=client_protocol)
- s.connect(('127.0.0.1', TESTPORT))
+ s.connect((HOST, server.port))
except ssl.SSLError as x:
raise test_support.TestFailed("Unexpected SSL error: " + str(x))
except Exception as x:
@@ -646,18 +641,17 @@ else:
if test_support.verbose:
sys.stdout.write(
" client: sending %s...\n" % (repr(indata)))
- s.write(indata.encode('ASCII', 'strict'))
+ s.write(indata)
outdata = s.read()
if connectionchatty:
if test_support.verbose:
sys.stdout.write(" client: read %s\n" % repr(outdata))
- outdata = str(outdata, 'ASCII', 'strict')
if outdata != indata.lower():
raise test_support.TestFailed(
"bad data <<%s>> (%d) received; expected <<%s>> (%d)\n"
- % (repr(outdata[:min(len(outdata),20)]), len(outdata),
- repr(indata[:min(len(indata),20)].lower()), len(indata)))
- s.write("over\n".encode("ASCII", "strict"))
+ % (outdata[:min(len(outdata),20)], len(outdata),
+ indata[:min(len(indata),20)].lower(), len(indata)))
+ s.write("over\n")
if connectionchatty:
if test_support.verbose:
sys.stdout.write(" client: closing connection.\n")
@@ -703,7 +697,44 @@ else:
class ThreadedTests(unittest.TestCase):
- def testEcho (self):
+ def testRudeShutdown(self):
+
+ listener_ready = threading.Event()
+ listener_gone = threading.Event()
+ port = test_support.find_unused_port()
+
+ # `listener` runs in a thread. It opens a socket listening on
+ # PORT, and sits in an accept() until the main thread connects.
+ # Then it rudely closes the socket, and sets Event `listener_gone`
+ # to let the main thread know the socket is gone.
+ def listener():
+ s = socket.socket()
+ s.bind((HOST, port))
+ s.listen(5)
+ listener_ready.set()
+ s.accept()
+ s = None # reclaim the socket object, which also closes it
+ listener_gone.set()
+
+ def connector():
+ listener_ready.wait()
+ s = socket.socket()
+ s.connect((HOST, port))
+ listener_gone.wait()
+ try:
+ ssl_sock = ssl.wrap_socket(s)
+ except IOError:
+ pass
+ else:
+ raise test_support.TestFailed(
+ 'connecting to closed SSL socket should have failed')
+
+ t = threading.Thread(target=listener)
+ t.start()
+ connector()
+ t.join()
+
+ def testEcho(self):
if test_support.verbose:
sys.stdout.write("\n")
@@ -716,7 +747,7 @@ else:
if test_support.verbose:
sys.stdout.write("\n")
s2 = socket.socket()
- server = ThreadedEchoServer(TESTPORT, CERTFILE,
+ server = ThreadedEchoServer(CERTFILE,
certreqs=ssl.CERT_NONE,
ssl_version=ssl.PROTOCOL_SSLv23,
cacerts=CERTFILE,
@@ -733,7 +764,7 @@ else:
ca_certs=CERTFILE,
cert_reqs=ssl.CERT_REQUIRED,
ssl_version=ssl.PROTOCOL_SSLv23)
- s.connect(('127.0.0.1', TESTPORT))
+ s.connect((HOST, server.port))
except ssl.SSLError as x:
raise test_support.TestFailed(
"Unexpected SSL error: " + str(x))
@@ -776,46 +807,6 @@ else:
badCertTest(os.path.join(os.path.dirname(__file__) or os.curdir,
"badkey.pem"))
- def testRudeShutdown(self):
-
- listener_ready = threading.Event()
- listener_gone = threading.Event()
-
- # `listener` runs in a thread. It opens a socket listening on
- # PORT, and sits in an accept() until the main thread connects.
- # Then it rudely closes the socket, and sets Event `listener_gone`
- # to let the main thread know the socket is gone.
- def listener():
- s = socket.socket()
- if hasattr(socket, 'SO_REUSEADDR'):
- s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- if hasattr(socket, 'SO_REUSEPORT'):
- s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
- s.bind(('127.0.0.1', TESTPORT))
- s.listen(5)
- listener_ready.set()
- s.accept()
- s = None # reclaim the socket object, which also closes it
- listener_gone.set()
-
- def connector():
- listener_ready.wait()
- s = socket.socket()
- s.connect(('127.0.0.1', TESTPORT))
- listener_gone.wait()
- try:
- ssl_sock = ssl.wrap_socket(s)
- except IOError:
- pass
- else:
- raise test_support.TestFailed(
- 'connecting to closed SSL socket should have failed')
-
- t = threading.Thread(target=listener)
- t.start()
- connector()
- t.join()
-
def testProtocolSSL2(self):
if test_support.verbose:
sys.stdout.write("\n")
@@ -873,7 +864,7 @@ else:
msgs = ("msg 1", "MSG 2", "STARTTLS", "MSG 3", "msg 4")
- server = ThreadedEchoServer(TESTPORT, CERTFILE,
+ server = ThreadedEchoServer(CERTFILE,
ssl_version=ssl.PROTOCOL_TLSv1,
starttls_server=True,
chatty=True,
@@ -888,7 +879,7 @@ else:
try:
s = socket.socket()
s.setblocking(1)
- s.connect(('127.0.0.1', TESTPORT))
+ s.connect((HOST, server.port))
except Exception as x:
raise test_support.TestFailed("Unexpected exception: " + str(x))
else:
@@ -936,7 +927,8 @@ else:
def testSocketServer(self):
- server = OurHTTPSServer(TESTPORT, CERTFILE)
+
+ server = AsyncoreHTTPSServer(CERTFILE)
flag = threading.Event()
server.start(flag)
# wait for it to start
@@ -948,8 +940,8 @@ else:
d1 = open(CERTFILE, 'rb').read()
d2 = ''
# now fetch the same data from the HTTPS server
- url = 'https://127.0.0.1:%d/%s' % (
- TESTPORT, os.path.split(CERTFILE)[1])
+ url = 'https://%s:%d/%s' % (
+ HOST, server.port, os.path.split(CERTFILE)[1])
f = urllib.urlopen(url)
dlen = f.info().getheader("content-length")
if dlen and (int(dlen) > 0):
@@ -978,71 +970,11 @@ else:
sys.stdout.write('joining thread\n')
server.join()
- def testAsyncoreServer(self):
-
- if test_support.verbose:
- sys.stdout.write("\n")
-
- indata="FOO\n"
- server = AsyncoreEchoServer(TESTPORT, CERTFILE)
- flag = threading.Event()
- server.start(flag)
- # wait for it to start
- flag.wait()
- # try to connect
- try:
- s = ssl.wrap_socket(socket.socket())
- s.connect(('127.0.0.1', TESTPORT))
- except ssl.SSLError as x:
- raise test_support.TestFailed("Unexpected SSL error: " + str(x))
- except Exception as x:
- raise test_support.TestFailed("Unexpected exception: " + str(x))
- else:
- if test_support.verbose:
- sys.stdout.write(
- " client: sending %s...\n" % (repr(indata)))
- s.sendall(indata.encode('ASCII', 'strict'))
- outdata = s.recv()
- if test_support.verbose:
- sys.stdout.write(" client: read %s\n" % repr(outdata))
- outdata = str(outdata, 'ASCII', 'strict')
- if outdata != indata.lower():
- raise test_support.TestFailed(
- "bad data <<%s>> (%d) received; expected <<%s>> (%d)\n"
- % (repr(outdata[:min(len(outdata),20)]), len(outdata),
- repr(indata[:min(len(indata),20)].lower()), len(indata)))
- s.write("over\n".encode("ASCII", "strict"))
- if test_support.verbose:
- sys.stdout.write(" client: closing connection.\n")
- s.close()
- finally:
- server.stop()
- server.join()
-
-
-def findtestsocket(start, end):
- def testbind(i):
- s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- try:
- s.bind(("127.0.0.1", i))
- except:
- return 0
- else:
- return 1
- finally:
- s.close()
-
- for i in range(start, end):
- if testbind(i) and testbind(i+1):
- return i
- return 0
-
-
def test_main(verbose=False):
if skip_expected:
raise test_support.TestSkipped("No SSL support")
- global CERTFILE, TESTPORT, SVN_PYTHON_ORG_ROOT_CERT
+ global CERTFILE, SVN_PYTHON_ORG_ROOT_CERT
CERTFILE = os.path.join(os.path.dirname(__file__) or os.curdir,
"keycert.pem")
SVN_PYTHON_ORG_ROOT_CERT = os.path.join(
@@ -1053,10 +985,6 @@ def test_main(verbose=False):
not os.path.exists(SVN_PYTHON_ORG_ROOT_CERT)):
raise test_support.TestFailed("Can't read certificate files!")
- TESTPORT = findtestsocket(10025, 12000)
- if not TESTPORT:
- raise test_support.TestFailed("Can't find open port to test servers on!")
-
tests = [BasicTests]
if test_support.is_resource_enabled('network'):
diff --git a/Lib/test/test_sundry.py b/Lib/test/test_sundry.py
index d878e7a..64a9690 100644
--- a/Lib/test/test_sundry.py
+++ b/Lib/test/test_sundry.py
@@ -1,111 +1,119 @@
"""Do a minimal test of all the modules that aren't otherwise tested."""
-from test.test_support import catch_warning
+from test import test_support
import sys
+import unittest
import warnings
-with catch_warning():
- from test.test_support import verbose
+class TestUntestedModules(unittest.TestCase):
+ def test_at_least_import_untested_modules(self):
+ with test_support.catch_warning():
+ import BaseHTTPServer
+ import DocXMLRPCServer
+ import CGIHTTPServer
+ import SimpleHTTPServer
+ import SimpleXMLRPCServer
+ import aifc
+ import bdb
+ import cgitb
+ import cmd
+ import code
+ import compileall
- import BaseHTTPServer
- import DocXMLRPCServer
- import CGIHTTPServer
- import SimpleHTTPServer
- import SimpleXMLRPCServer
- import aifc
- import bdb
- import cgitb
- import cmd
- import code
- import compileall
+ import distutils.archive_util
+ import distutils.bcppcompiler
+ import distutils.ccompiler
+ import distutils.cmd
+ import distutils.core
+ import distutils.cygwinccompiler
+ import distutils.dep_util
+ import distutils.dir_util
+ import distutils.emxccompiler
+ import distutils.errors
+ import distutils.extension
+ import distutils.file_util
+ import distutils.filelist
+ import distutils.log
+ if sys.platform.startswith('win'):
+ import distutils.msvccompiler
+ import distutils.mwerkscompiler
+ import distutils.sysconfig
+ import distutils.text_file
+ import distutils.unixccompiler
+ import distutils.util
+ import distutils.version
- import distutils.archive_util
- import distutils.bcppcompiler
- import distutils.ccompiler
- import distutils.cmd
- import distutils.core
- import distutils.cygwinccompiler
- import distutils.dep_util
- import distutils.dir_util
- import distutils.emxccompiler
- import distutils.errors
- import distutils.extension
- import distutils.file_util
- import distutils.filelist
- import distutils.log
- if sys.platform.startswith('win'):
- import distutils.msvccompiler
- import distutils.mwerkscompiler
- import distutils.sysconfig
- import distutils.text_file
- import distutils.unixccompiler
- import distutils.util
- import distutils.version
+ import distutils.command.bdist_dumb
+ if sys.platform.startswith('win'):
+ import distutils.command.bdist_msi
+ import distutils.command.bdist
+ import distutils.command.bdist_rpm
+ import distutils.command.bdist_wininst
+ import distutils.command.build_clib
+ import distutils.command.build_ext
+ import distutils.command.build
+ import distutils.command.build_py
+ import distutils.command.build_scripts
+ import distutils.command.clean
+ import distutils.command.config
+ import distutils.command.install_data
+ import distutils.command.install_egg_info
+ import distutils.command.install_headers
+ import distutils.command.install_lib
+ import distutils.command.install
+ import distutils.command.install_scripts
+ import distutils.command.register
+ import distutils.command.sdist
+ import distutils.command.upload
- import distutils.command.bdist_dumb
- if sys.platform.startswith('win'):
- import distutils.command.bdist_msi
- import distutils.command.bdist
- import distutils.command.bdist_rpm
- import distutils.command.bdist_wininst
- import distutils.command.build_clib
- import distutils.command.build_ext
- import distutils.command.build
- import distutils.command.build_py
- import distutils.command.build_scripts
- import distutils.command.clean
- import distutils.command.config
- import distutils.command.install_data
- import distutils.command.install_egg_info
- import distutils.command.install_headers
- import distutils.command.install_lib
- import distutils.command.install
- import distutils.command.install_scripts
- import distutils.command.register
- import distutils.command.sdist
- import distutils.command.upload
+ import encodings
+ import formatter
+ import ftplib
+ import getpass
+ import htmlentitydefs
+ import ihooks
+ import imghdr
+ import imputil
+ import keyword
+ import linecache
+ import macurl2path
+ import mailcap
+ import mutex
+ import nntplib
+ import nturl2path
+ import opcode
+ import os2emxpath
+ import pdb
+ import pstats
+ import py_compile
+ import pydoc
+ import rlcompleter
+ import sched
+ import smtplib
+ import sndhdr
+ import statvfs
+ import sunau
+ import sunaudio
+ import symbol
+ import tabnanny
+ import telnetlib
+ import timeit
+ import token
+ try:
+ import tty # not available on Windows
+ except ImportError:
+ if test_support.verbose:
+ print("skipping tty")
- import encodings
- import formatter
- import ftplib
- import getpass
- import htmlentitydefs
- import ihooks
- import imghdr
- import imputil
- import keyword
- import linecache
- import macurl2path
- import mailcap
- import mutex
- import nntplib
- import nturl2path
- import opcode
- import os2emxpath
- import pdb
- import pstats
- import py_compile
- import pydoc
- import rlcompleter
- import sched
- import smtplib
- import sndhdr
- import statvfs
- import sunau
- import sunaudio
- import symbol
- import tabnanny
- import telnetlib
- import timeit
- import token
- try:
- import tty # not available on Windows
- except ImportError:
- if verbose:
- print("skipping tty")
+ # Can't test the "user" module -- if the user has a ~/.pythonrc.py, it
+ # can screw up all sorts of things (esp. if it prints!).
+ #import user
+ import webbrowser
+ import xml
- # Can't test the "user" module -- if the user has a ~/.pythonrc.py, it
- # can screw up all sorts of things (esp. if it prints!).
- #import user
- import webbrowser
- import xml
+
+def test_main():
+ test_support.run_unittest(TestUntestedModules)
+
+if __name__ == "__main__":
+ test_main()
diff --git a/Lib/test/test_support.py b/Lib/test/test_support.py
index d2be9bf..b60f98c 100644
--- a/Lib/test/test_support.py
+++ b/Lib/test/test_support.py
@@ -103,32 +103,100 @@ def requires(resource, msg=None):
msg = "Use of the `%s' resource not enabled" % resource
raise ResourceDenied(msg)
-def bind_port(sock, host='', preferred_port=54321):
- """Try to bind the sock to a port. If we are running multiple
- tests and we don't try multiple ports, the test can fail. This
- makes the test more robust."""
-
- # Find some random ports that hopefully no one is listening on.
- # Ideally each test would clean up after itself and not continue listening
- # on any ports. However, this isn't the case. The last port (0) is
- # a stop-gap that asks the O/S to assign a port. Whenever the warning
- # message below is printed, the test that is listening on the port should
- # be fixed to close the socket at the end of the test.
- # Another reason why we can't use a port is another process (possibly
- # another instance of the test suite) is using the same port.
- for port in [preferred_port, 9907, 10243, 32999, 0]:
- try:
- sock.bind((host, port))
- if port == 0:
- port = sock.getsockname()[1]
- return port
- except socket.error as e:
- (err, msg) = e.args
- if err != errno.EADDRINUSE:
- raise
- print(' WARNING: failed to listen on port %d, ' % port +
- 'trying another', file=sys.__stderr__)
- raise TestFailed('unable to find port to listen on')
+HOST = 'localhost'
+
+def find_unused_port(family=socket.AF_INET, socktype=socket.SOCK_STREAM):
+ """Returns an unused port that should be suitable for binding. This is
+ achieved by creating a temporary socket with the same family and type as
+ the 'sock' parameter (default is AF_INET, SOCK_STREAM), and binding it to
+ the specified host address (defaults to 0.0.0.0) with the port set to 0,
+ eliciting an unused ephemeral port from the OS. The temporary socket is
+ then closed and deleted, and the ephemeral port is returned.
+
+ Either this method or bind_port() should be used for any tests where a
+ server socket needs to be bound to a particular port for the duration of
+ the test. Which one to use depends on whether the calling code is creating
+ a python socket, or if an unused port needs to be provided in a constructor
+ or passed to an external program (i.e. the -accept argument to openssl's
+ s_server mode). Always prefer bind_port() over find_unused_port() where
+ possible. Hard coded ports should *NEVER* be used. As soon as a server
+ socket is bound to a hard coded port, the ability to run multiple instances
+ of the test simultaneously on the same host is compromised, which makes the
+ test a ticking time bomb in a buildbot environment. On Unix buildbots, this
+ may simply manifest as a failed test, which can be recovered from without
+ intervention in most cases, but on Windows, the entire python process can
+ completely and utterly wedge, requiring someone to log in to the buildbot
+ and manually kill the affected process.
+
+ (This is easy to reproduce on Windows, unfortunately, and can be traced to
+ the SO_REUSEADDR socket option having different semantics on Windows versus
+ Unix/Linux. On Unix, you can't have two AF_INET SOCK_STREAM sockets bind,
+ listen and then accept connections on identical host/ports. An EADDRINUSE
+ socket.error will be raised at some point (depending on the platform and
+ the order bind and listen were called on each socket).
+
+ However, on Windows, if SO_REUSEADDR is set on the sockets, no EADDRINUSE
+ will ever be raised when attempting to bind two identical host/ports. When
+ accept() is called on each socket, the second caller's process will steal
+ the port from the first caller, leaving them both in an awkwardly wedged
+ state where they'll no longer respond to any signals or graceful kills, and
+ must be forcibly killed via OpenProcess()/TerminateProcess().
+
+ The solution on Windows is to use the SO_EXCLUSIVEADDRUSE socket option
+ instead of SO_REUSEADDR, which effectively affords the same semantics as
+ SO_REUSEADDR on Unix. Given the propensity of Unix developers in the Open
+ Source world compared to Windows ones, this is a common mistake. A quick
+ look over OpenSSL's 0.9.8g source shows that they use SO_REUSEADDR when
+ openssl.exe is called with the 's_server' option, for example. See
+ http://bugs.python.org/issue2550 for more info. The following site also
+ has a very thorough description about the implications of both REUSEADDR
+ and EXCLUSIVEADDRUSE on Windows:
+ http://msdn2.microsoft.com/en-us/library/ms740621(VS.85).aspx)
+
+ XXX: although this approach is a vast improvement on previous attempts to
+ elicit unused ports, it rests heavily on the assumption that the ephemeral
+ port returned to us by the OS won't immediately be dished back out to some
+ other process when we close and delete our temporary socket but before our
+ calling code has a chance to bind the returned port. We can deal with this
+ issue if/when we come across it.
+ """
+
+ tempsock = socket.socket(family, socktype)
+ port = bind_port(tempsock)
+ tempsock.close()
+ del tempsock
+ return port
+
+def bind_port(sock, host=HOST):
+ """Bind the socket to a free port and return the port number. Relies on
+ ephemeral ports in order to ensure we are using an unbound port. This is
+ important as many tests may be running simultaneously, especially in a
+ buildbot environment. This method raises an exception if the sock.family
+ is AF_INET and sock.type is SOCK_STREAM, *and* the socket has SO_REUSEADDR
+ or SO_REUSEPORT set on it. Tests should *never* set these socket options
+ for TCP/IP sockets. The only case for setting these options is testing
+ multicasting via multiple UDP sockets.
+
+ Additionally, if the SO_EXCLUSIVEADDRUSE socket option is available (i.e.
+ on Windows), it will be set on the socket. This will prevent anyone else
+ from bind()'ing to our host/port for the duration of the test.
+ """
+
+ if sock.family == socket.AF_INET and sock.type == socket.SOCK_STREAM:
+ if hasattr(socket, 'SO_REUSEADDR'):
+ if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) == 1:
+ raise TestFailed("tests should never set the SO_REUSEADDR " \
+ "socket option on TCP/IP sockets!")
+ if hasattr(socket, 'SO_REUSEPORT'):
+ if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) == 1:
+ raise TestFailed("tests should never set the SO_REUSEPORT " \
+ "socket option on TCP/IP sockets!")
+ if hasattr(socket, 'SO_EXCLUSIVEADDRUSE'):
+ sock.setsockopt(socket.SOL_SOCKET, socket.SO_EXCLUSIVEADDRUSE, 1)
+
+ sock.bind((host, 0))
+ port = sock.getsockname()[1]
+ return port
FUZZ = 1e-6
diff --git a/Lib/test/test_telnetlib.py b/Lib/test/test_telnetlib.py
index 8eee666..e4ee1b5 100644
--- a/Lib/test/test_telnetlib.py
+++ b/Lib/test/test_telnetlib.py
@@ -6,14 +6,9 @@ import time
from unittest import TestCase
from test import test_support
-PORT = 9091
+HOST = test_support.HOST
-def server(evt):
- serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- serv.settimeout(3)
- serv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- global PORT
- PORT = test_support.bind_port(serv, "", PORT)
+def server(evt, serv):
serv.listen(5)
evt.set()
try:
@@ -28,7 +23,10 @@ class GeneralTests(TestCase):
def setUp(self):
self.evt = threading.Event()
- threading.Thread(target=server, args=(self.evt,)).start()
+ self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ self.sock.settimeout(3)
+ self.port = test_support.bind_port(self.sock)
+ threading.Thread(target=server, args=(self.evt,self.sock)).start()
self.evt.wait()
self.evt.clear()
time.sleep(.1)
@@ -38,24 +36,24 @@ class GeneralTests(TestCase):
def testBasic(self):
# connects
- telnet = telnetlib.Telnet("localhost", PORT)
+ telnet = telnetlib.Telnet(HOST, self.port)
telnet.sock.close()
def testTimeoutDefault(self):
# default
- telnet = telnetlib.Telnet("localhost", PORT)
+ telnet = telnetlib.Telnet(HOST, self.port)
self.assertTrue(telnet.sock.gettimeout() is None)
telnet.sock.close()
def testTimeoutValue(self):
# a value
- telnet = telnetlib.Telnet("localhost", PORT, timeout=30)
+ telnet = telnetlib.Telnet(HOST, self.port, timeout=30)
self.assertEqual(telnet.sock.gettimeout(), 30)
telnet.sock.close()
def testTimeoutDifferentOrder(self):
telnet = telnetlib.Telnet(timeout=30)
- telnet.open("localhost", PORT)
+ telnet.open(HOST, self.port)
self.assertEqual(telnet.sock.gettimeout(), 30)
telnet.sock.close()
@@ -64,7 +62,7 @@ class GeneralTests(TestCase):
previous = socket.getdefaulttimeout()
socket.setdefaulttimeout(30)
try:
- telnet = telnetlib.Telnet("localhost", PORT, timeout=None)
+ telnet = telnetlib.Telnet(HOST, self.port, timeout=None)
finally:
socket.setdefaulttimeout(previous)
self.assertEqual(telnet.sock.gettimeout(), 30)
diff --git a/Lib/test/test_zlib.py b/Lib/test/test_zlib.py
index 65e633b..4fa263d 100644
--- a/Lib/test/test_zlib.py
+++ b/Lib/test/test_zlib.py
@@ -75,6 +75,11 @@ class ExceptionTestCase(unittest.TestCase):
# verify failure on building decompress object with bad params
self.assertRaises(ValueError, zlib.decompressobj, 0)
+ def test_decompressobj_badflush(self):
+ # verify failure on calling decompressobj.flush with bad params
+ self.assertRaises(ValueError, zlib.decompressobj().flush, 0)
+ self.assertRaises(ValueError, zlib.decompressobj().flush, -1)
+
class CompressTestCase(unittest.TestCase):