summaryrefslogtreecommitdiffstats
path: root/Lib/test
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test')
-rw-r--r--Lib/test/test_asynchat.py213
-rw-r--r--Lib/test/test_asyncore.py193
-rw-r--r--Lib/test/test_codecmaps_cn.py7
-rw-r--r--Lib/test/test_math.py10
-rw-r--r--Lib/test/test_multibytecodec_support.py16
-rw-r--r--Lib/test/test_pow.py3
-rw-r--r--Lib/test/test_resource.py25
-rw-r--r--Lib/test/test_runpy.py137
-rw-r--r--Lib/test/test_smtplib.py210
-rw-r--r--Lib/test/test_socket_ssl.py19
-rw-r--r--Lib/test/test_unicodedata.py3
-rw-r--r--Lib/test/test_urllib2_localnet.py40
12 files changed, 671 insertions, 205 deletions
diff --git a/Lib/test/test_asynchat.py b/Lib/test/test_asynchat.py
index 7629296..be8e116 100644
--- a/Lib/test/test_asynchat.py
+++ b/Lib/test/test_asynchat.py
@@ -3,12 +3,17 @@
import thread # If this fails, we can't test this module
import asyncore, asynchat, socket, threading, time
import unittest
+import sys
from test import test_support
HOST = "127.0.0.1"
PORT = 54322
+SERVER_QUIT = b'QUIT\n'
class echo_server(threading.Thread):
+ # parameter to determine the number of bytes passed back to the
+ # client each send
+ chunk_size = 1
def run(self):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
@@ -17,15 +22,28 @@ class echo_server(threading.Thread):
PORT = test_support.bind_port(sock, HOST, PORT)
sock.listen(1)
conn, client = sock.accept()
- buffer = b""
- while b"\n" not in buffer:
+ self.buffer = b""
+ # collect data until quit message is seen
+ while SERVER_QUIT not in self.buffer:
data = conn.recv(1)
if not data:
break
- buffer = buffer + data
- while buffer:
- n = conn.send(buffer)
- buffer = buffer[n:]
+ self.buffer = self.buffer + data
+
+ # remove the SERVER_QUIT message
+ self.buffer = self.buffer.replace(SERVER_QUIT, b'')
+
+ # re-send entire set of collected data
+ try:
+ # this may fail on some tests, such as test_close_when_done, since
+ # the client closes the channel when it's done sending
+ while self.buffer:
+ n = conn.send(self.buffer[:self.chunk_size])
+ time.sleep(0.001)
+ self.buffer = self.buffer[n:]
+ except:
+ pass
+
conn.close()
sock.close()
@@ -33,7 +51,7 @@ class echo_client(asynchat.async_chat):
def __init__(self, terminator):
asynchat.async_chat.__init__(self)
- self.contents = None
+ self.contents = []
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
self.connect((HOST, PORT))
self.set_terminator(terminator)
@@ -41,53 +59,194 @@ class echo_client(asynchat.async_chat):
def handle_connect(self):
pass
- ##print "Connected"
+
+ if sys.platform == 'darwin':
+ # select.poll returns a select.POLLHUP at the end of the tests
+ # on darwin, so just ignore it
+ def handle_expt(self):
+ pass
def collect_incoming_data(self, data):
- self.buffer = self.buffer + data
+ self.buffer += data
def found_terminator(self):
- #print "Received:", repr(self.buffer)
- self.contents = self.buffer
+ self.contents.append(self.buffer)
self.buffer = b""
- self.close()
class TestAsynchat(unittest.TestCase):
+ usepoll = False
+
def setUp (self):
pass
def tearDown (self):
pass
- def test_line_terminator(self):
+ def line_terminator_check(self, term, server_chunk):
+ s = echo_server()
+ s.chunk_size = server_chunk
+ s.start()
+ time.sleep(0.5) # Give server time to initialize
+ c = echo_client(term)
+ c.push(b"hello ")
+ c.push(bytes("world%s" % term))
+ c.push(bytes("I'm not dead yet!%s" % term))
+ c.push(SERVER_QUIT)
+ asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
+ s.join()
+
+ self.assertEqual(c.contents, [b"hello world", b"I'm not dead yet!"])
+
+ # the line terminator tests below check receiving variously-sized
+ # chunks back from the server in order to exercise all branches of
+ # async_chat.handle_read
+
+ def test_line_terminator1(self):
+ # test one-character terminator
+ for l in (1,2,3):
+ self.line_terminator_check(b'\n', l)
+
+ def test_line_terminator2(self):
+ # test two-character terminator
+ for l in (1,2,3):
+ self.line_terminator_check(b'\r\n', l)
+
+ def test_line_terminator3(self):
+ # test three-character terminator
+ for l in (1,2,3):
+ self.line_terminator_check(b'qqq', l)
+
+ def numeric_terminator_check(self, termlen):
+ # Try reading a fixed number of bytes
s = echo_server()
s.start()
- time.sleep(1) # Give server time to initialize
- c = echo_client('\n')
- c.push("hello ")
- c.push("world\n")
- asyncore.loop()
+ time.sleep(0.5) # Give server time to initialize
+ c = echo_client(termlen)
+ data = b"hello world, I'm not dead yet!\n"
+ c.push(data)
+ c.push(SERVER_QUIT)
+ asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
s.join()
- self.assertEqual(c.contents, b'hello world')
+ self.assertEqual(c.contents, [data[:termlen]])
- def test_numeric_terminator(self):
+ def test_numeric_terminator1(self):
+ # check that ints & longs both work (since type is
+ # explicitly checked in async_chat.handle_read)
+ self.numeric_terminator_check(1)
+
+ def test_numeric_terminator2(self):
+ self.numeric_terminator_check(6)
+
+ def test_none_terminator(self):
# Try reading a fixed number of bytes
s = echo_server()
s.start()
- time.sleep(1) # Give server time to initialize
- c = echo_client(6)
- c.push("hello ")
- c.push("world\n")
- asyncore.loop()
+ time.sleep(0.5) # Give server time to initialize
+ c = echo_client(None)
+ data = b"hello world, I'm not dead yet!\n"
+ c.push(data)
+ c.push(SERVER_QUIT)
+ asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
+ s.join()
+
+ self.assertEqual(c.contents, [])
+ self.assertEqual(c.buffer, data)
+
+ def test_simple_producer(self):
+ s = echo_server()
+ s.start()
+ time.sleep(0.5) # Give server time to initialize
+ c = echo_client(b'\n')
+ data = b"hello world\nI'm not dead yet!\n"
+ p = asynchat.simple_producer(data+SERVER_QUIT, buffer_size=8)
+ c.push_with_producer(p)
+ asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
+ s.join()
+
+ self.assertEqual(c.contents, [b"hello world", b"I'm not dead yet!"])
+
+ def test_string_producer(self):
+ s = echo_server()
+ s.start()
+ time.sleep(0.5) # Give server time to initialize
+ c = echo_client(b'\n')
+ data = b"hello world\nI'm not dead yet!\n"
+ c.push_with_producer(data+SERVER_QUIT)
+ asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
+ s.join()
+
+ self.assertEqual(c.contents, [b"hello world", b"I'm not dead yet!"])
+
+ def test_empty_line(self):
+ # checks that empty lines are handled correctly
+ s = echo_server()
+ s.start()
+ time.sleep(0.5) # Give server time to initialize
+ c = echo_client(b'\n')
+ c.push("hello world\n\nI'm not dead yet!\n")
+ c.push(SERVER_QUIT)
+ asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
+ s.join()
+
+ self.assertEqual(c.contents,
+ [b"hello world", b"", b"I'm not dead yet!"])
+
+ def test_close_when_done(self):
+ s = echo_server()
+ s.start()
+ time.sleep(0.5) # Give server time to initialize
+ c = echo_client(b'\n')
+ c.push("hello world\nI'm not dead yet!\n")
+ c.push(SERVER_QUIT)
+ c.close_when_done()
+ asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
s.join()
- self.assertEqual(c.contents, b'hello ')
+ self.assertEqual(c.contents, [])
+ # the server might have been able to send a byte or two back, but this
+ # at least checks that it received something and didn't just fail
+ # (which could still result in the client not having received anything)
+ self.assertTrue(len(s.buffer) > 0)
+
+
+class TestAsynchat_WithPoll(TestAsynchat):
+ usepoll = True
+
+class TestHelperFunctions(unittest.TestCase):
+ def test_find_prefix_at_end(self):
+ self.assertEqual(asynchat.find_prefix_at_end("qwerty\r", "\r\n"), 1)
+ self.assertEqual(asynchat.find_prefix_at_end("qwertydkjf", "\r\n"), 0)
+
+class TestFifo(unittest.TestCase):
+ def test_basic(self):
+ f = asynchat.fifo()
+ f.push(7)
+ f.push(b'a')
+ self.assertEqual(len(f), 2)
+ self.assertEqual(f.first(), 7)
+ self.assertEqual(f.pop(), (1, 7))
+ self.assertEqual(len(f), 1)
+ self.assertEqual(f.first(), b'a')
+ self.assertEqual(f.is_empty(), False)
+ self.assertEqual(f.pop(), (1, b'a'))
+ self.assertEqual(len(f), 0)
+ self.assertEqual(f.is_empty(), True)
+ self.assertEqual(f.pop(), (0, None))
+
+ def test_given_list(self):
+ f = asynchat.fifo([b'x', 17, 3])
+ self.assertEqual(len(f), 3)
+ self.assertEqual(f.pop(), (1, b'x'))
+ self.assertEqual(f.pop(), (1, 17))
+ self.assertEqual(f.pop(), (1, 3))
+ self.assertEqual(f.pop(), (0, None))
def test_main(verbose=None):
- test_support.run_unittest(TestAsynchat)
+ test_support.run_unittest(TestAsynchat, TestAsynchat_WithPoll,
+ TestHelperFunctions, TestFifo)
if __name__ == "__main__":
test_main(verbose=True)
diff --git a/Lib/test/test_asyncore.py b/Lib/test/test_asyncore.py
index 6f848a7..33c2fb2 100644
--- a/Lib/test/test_asyncore.py
+++ b/Lib/test/test_asyncore.py
@@ -12,7 +12,7 @@ from test.test_support import TESTFN, run_unittest, unlink
from io import StringIO, BytesIO
HOST = "127.0.0.1"
-PORT = 54329
+PORT = None
class dummysocket:
def __init__(self):
@@ -53,12 +53,14 @@ class crashingdummy:
# used when testing senders; just collects what it gets until newline is sent
def capture_server(evt, buf):
- serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- serv.settimeout(3)
- serv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- serv.bind(("", PORT))
- serv.listen(5)
try:
+ serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ serv.settimeout(3)
+ serv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+ serv.bind(("", 0))
+ global PORT
+ PORT = serv.getsockname()[1]
+ serv.listen(5)
conn, addr = serv.accept()
except socket.timeout:
pass
@@ -79,6 +81,7 @@ def capture_server(evt, buf):
conn.close()
finally:
serv.close()
+ PORT = None
evt.set()
@@ -107,87 +110,83 @@ class HelperFunctionTests(unittest.TestCase):
asyncore._exception(tr2)
self.assertEqual(tr2.error_handled, True)
-## Commented out these tests because test a non-documented function
-## (which is actually public, why it's not documented?). Anyway, the
-## tests *and* the function uses constants in the select module that
-## are not present in Windows systems (see this thread:
-## http://mail.python.org/pipermail/python-list/2001-October/109973.html)
-## Note even that these constants are mentioned in the select
-## documentation, as a parameter of "poll" method "register", but are
-## not explicit declared as constants of the module.
-## . Facundo Batista
-##
-## def test_readwrite(self):
-## # Check that correct methods are called by readwrite()
-##
-## class testobj:
-## def __init__(self):
-## self.read = False
-## self.write = False
-## self.expt = False
-##
-## def handle_read_event(self):
-## self.read = True
-##
-## def handle_write_event(self):
-## self.write = True
-##
-## def handle_expt_event(self):
-## self.expt = True
-##
-## def handle_error(self):
-## self.error_handled = True
-##
-## for flag in (select.POLLIN, select.POLLPRI):
-## tobj = testobj()
-## self.assertEqual(tobj.read, False)
-## asyncore.readwrite(tobj, flag)
-## self.assertEqual(tobj.read, True)
-##
-## # check that ExitNow exceptions in the object handler method
-## # bubbles all the way up through asyncore readwrite call
-## tr1 = exitingdummy()
-## self.assertRaises(asyncore.ExitNow, asyncore.readwrite, tr1, flag)
-##
-## # check that an exception other than ExitNow in the object handler
-## # method causes the handle_error method to get called
-## tr2 = crashingdummy()
-## asyncore.readwrite(tr2, flag)
-## self.assertEqual(tr2.error_handled, True)
-##
-## tobj = testobj()
-## self.assertEqual(tobj.write, False)
-## asyncore.readwrite(tobj, select.POLLOUT)
-## self.assertEqual(tobj.write, True)
-##
-## # check that ExitNow exceptions in the object handler method
-## # bubbles all the way up through asyncore readwrite call
-## tr1 = exitingdummy()
-## self.assertRaises(asyncore.ExitNow, asyncore.readwrite, tr1,
-## select.POLLOUT)
-##
-## # check that an exception other than ExitNow in the object handler
-## # method causes the handle_error method to get called
-## tr2 = crashingdummy()
-## asyncore.readwrite(tr2, select.POLLOUT)
-## self.assertEqual(tr2.error_handled, True)
-##
-## for flag in (select.POLLERR, select.POLLHUP, select.POLLNVAL):
-## tobj = testobj()
-## self.assertEqual(tobj.expt, False)
-## asyncore.readwrite(tobj, flag)
-## self.assertEqual(tobj.expt, True)
-##
-## # check that ExitNow exceptions in the object handler method
-## # bubbles all the way up through asyncore readwrite calls
-## tr1 = exitingdummy()
-## self.assertRaises(asyncore.ExitNow, asyncore.readwrite, tr1, flag)
-##
-## # check that an exception other than ExitNow in the object handler
-## # method causes the handle_error method to get called
-## tr2 = crashingdummy()
-## asyncore.readwrite(tr2, flag)
-## self.assertEqual(tr2.error_handled, True)
+ # asyncore.readwrite uses constants in the select module that
+ # are not present in Windows systems (see this thread:
+ # http://mail.python.org/pipermail/python-list/2001-October/109973.html)
+ # These constants should be present as long as poll is available
+
+ if hasattr(select, 'poll'):
+ def test_readwrite(self):
+ # Check that correct methods are called by readwrite()
+
+ class testobj:
+ def __init__(self):
+ self.read = False
+ self.write = False
+ self.expt = False
+
+ def handle_read_event(self):
+ self.read = True
+
+ def handle_write_event(self):
+ self.write = True
+
+ def handle_expt_event(self):
+ self.expt = True
+
+ def handle_error(self):
+ self.error_handled = True
+
+ for flag in (select.POLLIN, select.POLLPRI):
+ tobj = testobj()
+ self.assertEqual(tobj.read, False)
+ asyncore.readwrite(tobj, flag)
+ self.assertEqual(tobj.read, True)
+
+ # check that ExitNow exceptions in the object handler method
+ # bubbles all the way up through asyncore readwrite call
+ tr1 = exitingdummy()
+ self.assertRaises(asyncore.ExitNow, asyncore.readwrite, tr1, flag)
+
+ # check that an exception other than ExitNow in the object handler
+ # method causes the handle_error method to get called
+ tr2 = crashingdummy()
+ asyncore.readwrite(tr2, flag)
+ self.assertEqual(tr2.error_handled, True)
+
+ tobj = testobj()
+ self.assertEqual(tobj.write, False)
+ asyncore.readwrite(tobj, select.POLLOUT)
+ self.assertEqual(tobj.write, True)
+
+ # check that ExitNow exceptions in the object handler method
+ # bubbles all the way up through asyncore readwrite call
+ tr1 = exitingdummy()
+ self.assertRaises(asyncore.ExitNow, asyncore.readwrite, tr1,
+ select.POLLOUT)
+
+ # check that an exception other than ExitNow in the object handler
+ # method causes the handle_error method to get called
+ tr2 = crashingdummy()
+ asyncore.readwrite(tr2, select.POLLOUT)
+ self.assertEqual(tr2.error_handled, True)
+
+ for flag in (select.POLLERR, select.POLLHUP, select.POLLNVAL):
+ tobj = testobj()
+ self.assertEqual(tobj.expt, False)
+ asyncore.readwrite(tobj, flag)
+ self.assertEqual(tobj.expt, True)
+
+ # check that ExitNow exceptions in the object handler method
+ # bubbles all the way up through asyncore readwrite calls
+ tr1 = exitingdummy()
+ self.assertRaises(asyncore.ExitNow, asyncore.readwrite, tr1, flag)
+
+ # check that an exception other than ExitNow in the object handler
+ # method causes the handle_error method to get called
+ tr2 = crashingdummy()
+ asyncore.readwrite(tr2, flag)
+ self.assertEqual(tr2.error_handled, True)
def test_closeall(self):
self.closeall_check(False)
@@ -343,12 +342,26 @@ class DispatcherWithSendTests(unittest.TestCase):
self.evt = threading.Event()
cap = BytesIO()
threading.Thread(target=capture_server, args=(self.evt, cap)).start()
- time.sleep(1) # Give server time to initialize
- data = b"Suppose there isn't a 16-ton weight?"*5
+ # wait until server thread has assigned a port number
+ n = 1000
+ while PORT is None and n > 0:
+ time.sleep(0.01)
+ n -= 1
+
+ # wait a little longer for the server to initialize (it sometimes
+ # refuses connections on slow machines without this wait)
+ time.sleep(0.2)
+
+ data = b"Suppose there isn't a 16-ton weight?"
d = dispatcherwithsend_noread()
d.create_socket(socket.AF_INET, socket.SOCK_STREAM)
d.connect((HOST, PORT))
+
+ # give time for socket to connect
+ time.sleep(0.1)
+
+ d.send(data)
d.send(data)
d.send(b'\n')
@@ -359,7 +372,7 @@ class DispatcherWithSendTests(unittest.TestCase):
self.evt.wait()
- self.assertEqual(cap.getvalue(), data)
+ self.assertEqual(cap.getvalue(), data*2)
class DispatcherWithSendTests_UsePoll(DispatcherWithSendTests):
diff --git a/Lib/test/test_codecmaps_cn.py b/Lib/test/test_codecmaps_cn.py
index 75541ac..344fc56 100644
--- a/Lib/test/test_codecmaps_cn.py
+++ b/Lib/test/test_codecmaps_cn.py
@@ -19,6 +19,13 @@ class TestGBKMap(test_multibytecodec_support.TestBase_Mapping,
mapfileurl = 'http://www.unicode.org/Public/MAPPINGS/VENDORS/' \
'MICSFT/WINDOWS/CP936.TXT'
+class TestGB18030Map(test_multibytecodec_support.TestBase_Mapping,
+ unittest.TestCase):
+ encoding = 'gb18030'
+ mapfileurl = 'http://source.icu-project.org/repos/icu/data/' \
+ 'trunk/charset/data/xml/gb-18030-2000.xml'
+
+
def test_main():
test_support.run_unittest(__name__)
diff --git a/Lib/test/test_math.py b/Lib/test/test_math.py
index e5c6ead..c28902b 100644
--- a/Lib/test/test_math.py
+++ b/Lib/test/test_math.py
@@ -12,7 +12,11 @@ class MathTests(unittest.TestCase):
def ftest(self, name, value, expected):
if abs(value-expected) > eps:
- self.fail('%s returned %f, expected %f'%\
+ # Use %r instead of %f so the error message
+ # displays full precision. Otherwise discrepancies
+ # in the last few bits will lead to very confusing
+ # error messages
+ self.fail('%s returned %r, expected %r' %
(name, value, expected))
def testConstants(self):
@@ -92,6 +96,10 @@ class MathTests(unittest.TestCase):
self.ftest('floor(-0.5)', math.floor(-0.5), -1)
self.ftest('floor(-1.0)', math.floor(-1.0), -1)
self.ftest('floor(-1.5)', math.floor(-1.5), -2)
+ # pow() relies on floor() to check for integers
+ # This fails on some platforms - so check it here
+ self.ftest('floor(1.23e167)', math.floor(1.23e167), 1.23e167)
+ self.ftest('floor(-1.23e167)', math.floor(-1.23e167), -1.23e167)
def testFmod(self):
self.assertRaises(TypeError, math.fmod)
diff --git a/Lib/test/test_multibytecodec_support.py b/Lib/test/test_multibytecodec_support.py
index 40a67ea..2073223 100644
--- a/Lib/test/test_multibytecodec_support.py
+++ b/Lib/test/test_multibytecodec_support.py
@@ -5,7 +5,7 @@
#
import sys, codecs, os.path
-import unittest
+import unittest, re
from test import test_support
from io import BytesIO
@@ -272,6 +272,12 @@ class TestBase_Mapping(unittest.TestCase):
return test_support.open_urlresource(self.mapfileurl)
def test_mapping_file(self):
+ if self.mapfileurl.endswith('.xml'):
+ self._test_mapping_file_ucm()
+ else:
+ self._test_mapping_file_plain()
+
+ def _test_mapping_file_plain(self):
unichrs = lambda s: ''.join(map(chr, map(eval, s.split('+'))))
urt_wa = {}
@@ -303,6 +309,14 @@ class TestBase_Mapping(unittest.TestCase):
self._testpoint(csetch, unich)
+ def _test_mapping_file_ucm(self):
+ ucmdata = self.open_mapping_file().read()
+ uc = re.findall('<a u="([A-F0-9]{4})" b="([0-9A-F ]+)"/>', ucmdata)
+ for uni, coded in uc:
+ unich = chr(int(uni, 16))
+ codech = bytes(int(c, 16) for c in coded.split())
+ self._testpoint(codech, unich)
+
def test_mapping_supplemental(self):
for mapping in self.supmaps:
self._testpoint(*mapping)
diff --git a/Lib/test/test_pow.py b/Lib/test/test_pow.py
index 62c641b..9b358f1 100644
--- a/Lib/test/test_pow.py
+++ b/Lib/test/test_pow.py
@@ -106,6 +106,9 @@ class PowTest(unittest.TestCase):
# platform pow() was buggy, and Python didn't worm around it.
eq = self.assertEquals
a = -1.0
+ # The next two tests can still fail if the platform floor()
+ # function doesn't treat all large inputs as integers
+ # test_math should also fail if that is happening
eq(pow(a, 1.23e167), 1.0)
eq(pow(a, -1.23e167), 1.0)
for b in range(-10, 11):
diff --git a/Lib/test/test_resource.py b/Lib/test/test_resource.py
index 2450e78..25fea97 100644
--- a/Lib/test/test_resource.py
+++ b/Lib/test/test_resource.py
@@ -49,17 +49,24 @@ class ResourceTest(unittest.TestCase):
except ValueError:
limit_set = False
f = open(test_support.TESTFN, "wb")
- f.write("X" * 1024)
try:
- f.write("Y")
- f.flush()
- except IOError:
- if not limit_set:
- raise
- f.close()
- os.unlink(test_support.TESTFN)
+ f.write("X" * 1024)
+ try:
+ f.write("Y")
+ f.flush()
+ except IOError:
+ if not limit_set:
+ raise
+ if limit_set:
+ # Close will attempt to flush the byte we wrote
+ # Restore limit first to avoid getting a spurious error
+ resource.setrlimit(resource.RLIMIT_FSIZE, (cur, max))
+ finally:
+ f.close()
+ os.unlink(test_support.TESTFN)
finally:
- resource.setrlimit(resource.RLIMIT_FSIZE, (cur, max))
+ if limit_set:
+ resource.setrlimit(resource.RLIMIT_FSIZE, (cur, max))
def test_fsize_toobig(self):
# Be sure that setrlimit is checking for really large values
diff --git a/Lib/test/test_runpy.py b/Lib/test/test_runpy.py
index 868662d..d10e40f 100644
--- a/Lib/test/test_runpy.py
+++ b/Lib/test/test_runpy.py
@@ -21,12 +21,12 @@ class RunModuleCodeTest(unittest.TestCase):
"# Check the sys module\n"
"import sys\n"
"run_argv0 = sys.argv[0]\n"
- "if __name__ in sys.modules:\n"
- " run_name = sys.modules[__name__].__name__\n"
+ "run_name_in_sys_modules = __name__ in sys.modules\n"
+ "if run_name_in_sys_modules:\n"
+ " module_in_sys_modules = globals() is sys.modules[__name__].__dict__\n"
"# Check nested operation\n"
"import runpy\n"
- "nested = runpy._run_module_code('x=1\\n', mod_name='<run>',\n"
- " alter_sys=True)\n"
+ "nested = runpy._run_module_code('x=1\\n', mod_name='<run>')\n"
)
@@ -37,34 +37,44 @@ class RunModuleCodeTest(unittest.TestCase):
loader = "Now you're just being silly"
d1 = dict(initial=initial)
saved_argv0 = sys.argv[0]
- d2 = _run_module_code(self.test_source,
- d1,
- name,
- file,
- loader,
- True)
- self.failUnless("result" not in d1)
- self.failUnless(d2["initial"] is initial)
- self.assertEqual(d2["result"], self.expected_result)
- self.assertEqual(d2["nested"]["x"], 1)
- self.failUnless(d2["__name__"] is name)
- self.failUnless(d2["run_name"] is name)
- self.failUnless(d2["__file__"] is file)
- self.failUnless(d2["run_argv0"] is file)
- self.failUnless(d2["__loader__"] is loader)
- self.failUnless(sys.argv[0] is saved_argv0)
- self.failUnless(name not in sys.modules)
+ try:
+ d2 = _run_module_code(self.test_source,
+ d1,
+ name,
+ file,
+ loader,
+ alter_sys=True)
+ self.failUnless("result" not in d1)
+ self.failUnless(d2["initial"] is initial)
+ self.assertEqual(d2["result"], self.expected_result)
+ self.assertEqual(d2["nested"]["x"], 1)
+ self.assertEqual(d2["nested"]["__name__"], "<run>")
+ self.failUnless(d2["__name__"] is name)
+ self.failUnless(d2["__file__"] is file)
+ self.failUnless(d2["__loader__"] is loader)
+ self.failUnless(d2["run_argv0"] is file)
+ self.failUnless(d2["run_name_in_sys_modules"])
+ self.failUnless(d2["module_in_sys_modules"])
+ self.failUnless(sys.argv[0] is not saved_argv0)
+ self.failUnless(name in sys.modules)
+ finally:
+ sys.argv[0] = saved_argv0
+ if name in sys.modules:
+ del sys.modules[name]
def test_run_module_code_defaults(self):
saved_argv0 = sys.argv[0]
d = _run_module_code(self.test_source)
self.assertEqual(d["result"], self.expected_result)
+ self.failUnless(d["nested"]["x"] == 1)
+ self.failUnless(d["nested"]["__name__"] == "<run>")
self.failUnless(d["__name__"] is None)
self.failUnless(d["__file__"] is None)
self.failUnless(d["__loader__"] is None)
self.failUnless(d["run_argv0"] is saved_argv0)
- self.failUnless("run_name" not in d)
+ self.failUnless(not d["run_name_in_sys_modules"])
self.failUnless(sys.argv[0] is saved_argv0)
+ self.failUnless(None not in sys.modules)
class RunModuleTest(unittest.TestCase):
@@ -77,19 +87,29 @@ class RunModuleTest(unittest.TestCase):
self.fail("Expected import error for " + mod_name)
def test_invalid_names(self):
+ # Builtin module
self.expect_import_error("sys")
+ # Non-existent modules
self.expect_import_error("sys.imp.eric")
self.expect_import_error("os.path.half")
self.expect_import_error("a.bee")
self.expect_import_error(".howard")
self.expect_import_error("..eaten")
+ # Package
+ self.expect_import_error("logging")
def test_library_module(self):
run_module("runpy")
+ def _add_pkg_dir(self, pkg_dir):
+ os.mkdir(pkg_dir)
+ pkg_fname = os.path.join(pkg_dir, "__init__"+os.extsep+"py")
+ pkg_file = open(pkg_fname, "w")
+ pkg_file.close()
+ return pkg_fname
+
def _make_pkg(self, source, depth):
pkg_name = "__runpy_pkg__"
- init_fname = "__init__"+os.extsep+"py"
test_fname = "runpy_test"+os.extsep+"py"
pkg_dir = sub_dir = tempfile.mkdtemp()
if verbose: print(" Package tree in:", sub_dir)
@@ -97,11 +117,8 @@ class RunModuleTest(unittest.TestCase):
if verbose: print(" Updated sys.path:", sys.path[0])
for i in range(depth):
sub_dir = os.path.join(sub_dir, pkg_name)
- os.mkdir(sub_dir)
+ pkg_fname = self._add_pkg_dir(sub_dir)
if verbose: print(" Next level in:", sub_dir)
- pkg_fname = os.path.join(sub_dir, init_fname)
- pkg_file = open(pkg_fname, "w")
- pkg_file.close()
if verbose: print(" Created:", pkg_fname)
mod_fname = os.path.join(sub_dir, test_fname)
mod_file = open(mod_fname, "w")
@@ -112,13 +129,9 @@ class RunModuleTest(unittest.TestCase):
return pkg_dir, mod_fname, mod_name
def _del_pkg(self, top, depth, mod_name):
- for i in range(depth+1): # Don't forget the module itself
- parts = mod_name.rsplit(".", i)
- entry = parts[0]
- try:
+ for entry in list(sys.modules):
+ if entry.startswith("__runpy_pkg__"):
del sys.modules[entry]
- except KeyError as ex:
- if verbose: print(ex) # Persist with cleaning up
if verbose: print(" Removed sys.modules entries")
del sys.path[0]
if verbose: print(" Removed sys.path entry")
@@ -146,23 +159,81 @@ class RunModuleTest(unittest.TestCase):
try:
if verbose: print("Running from source:", mod_name)
d1 = run_module(mod_name) # Read from source
+ self.failUnless("x" in d1)
self.assertEqual(d1["x"], 1)
del d1 # Ensure __loader__ entry doesn't keep file open
__import__(mod_name)
os.remove(mod_fname)
if verbose: print("Running from compiled:", mod_name)
d2 = run_module(mod_name) # Read from bytecode
+ self.failUnless("x" in d2)
self.assertEqual(d2["x"], 1)
del d2 # Ensure __loader__ entry doesn't keep file open
finally:
self._del_pkg(pkg_dir, depth, mod_name)
if verbose: print("Module executed successfully")
+ def _add_relative_modules(self, base_dir, depth):
+ if depth <= 1:
+ raise ValueError("Relative module test needs depth > 1")
+ pkg_name = "__runpy_pkg__"
+ module_dir = base_dir
+ for i in range(depth):
+ parent_dir = module_dir
+ module_dir = os.path.join(module_dir, pkg_name)
+ # Add sibling module
+ sibling_fname = os.path.join(module_dir, "sibling"+os.extsep+"py")
+ sibling_file = open(sibling_fname, "w")
+ sibling_file.close()
+ if verbose: print(" Added sibling module:", sibling_fname)
+ # Add nephew module
+ uncle_dir = os.path.join(parent_dir, "uncle")
+ self._add_pkg_dir(uncle_dir)
+ if verbose: print(" Added uncle package:", uncle_dir)
+ cousin_dir = os.path.join(uncle_dir, "cousin")
+ self._add_pkg_dir(cousin_dir)
+ if verbose: print(" Added cousin package:", cousin_dir)
+ nephew_fname = os.path.join(cousin_dir, "nephew"+os.extsep+"py")
+ nephew_file = open(nephew_fname, "w")
+ nephew_file.close()
+ if verbose: print(" Added nephew module:", nephew_fname)
+
+ def _check_relative_imports(self, depth, run_name=None):
+ contents = """\
+from __future__ import absolute_import
+from . import sibling
+from ..uncle.cousin import nephew
+"""
+ pkg_dir, mod_fname, mod_name = (
+ self._make_pkg(contents, depth))
+ try:
+ self._add_relative_modules(pkg_dir, depth)
+ if verbose: print("Running from source:", mod_name)
+ d1 = run_module(mod_name) # Read from source
+ self.failUnless("sibling" in d1)
+ self.failUnless("nephew" in d1)
+ del d1 # Ensure __loader__ entry doesn't keep file open
+ __import__(mod_name)
+ os.remove(mod_fname)
+ if verbose: print("Running from compiled:", mod_name)
+ d2 = run_module(mod_name) # Read from bytecode
+ self.failUnless("sibling" in d2)
+ self.failUnless("nephew" in d2)
+ del d2 # Ensure __loader__ entry doesn't keep file open
+ finally:
+ self._del_pkg(pkg_dir, depth, mod_name)
+ if verbose: print("Module executed successfully")
+
def test_run_module(self):
for depth in range(4):
if verbose: print("Testing package depth:", depth)
self._check_module(depth)
+ def test_explicit_relative_import(self):
+ for depth in range(2, 5):
+ if verbose: print("Testing relative imports at depth:", depth)
+ self._check_relative_imports(depth)
+
def test_main():
run_unittest(RunModuleCodeTest)
diff --git a/Lib/test/test_smtplib.py b/Lib/test/test_smtplib.py
index 3542ddb..cf92662 100644
--- a/Lib/test/test_smtplib.py
+++ b/Lib/test/test_smtplib.py
@@ -1,53 +1,100 @@
+import asyncore
import socket
import threading
+import smtpd
import smtplib
+import io
+import sys
import time
+import select
from unittest import TestCase
from test import test_support
+# PORT is used to communicate the port number assigned to the server
+# to the test client
+HOST = "localhost"
+PORT = None
-def server(evt):
- serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- serv.settimeout(3)
- serv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- serv.bind(("", 9091))
- serv.listen(5)
+def server(evt, buf):
try:
+ serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ serv.settimeout(3)
+ serv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+ serv.bind(("", 0))
+ global PORT
+ PORT = serv.getsockname()[1]
+ serv.listen(5)
conn, addr = serv.accept()
except socket.timeout:
pass
else:
- conn.send("220 Hola mundo\n")
+ n = 500
+ while buf and n > 0:
+ r, w, e = select.select([], [conn], [])
+ if w:
+ sent = conn.send(buf)
+ buf = buf[sent:]
+
+ n -= 1
+ time.sleep(0.01)
+
conn.close()
finally:
serv.close()
+ PORT = None
evt.set()
class GeneralTests(TestCase):
def setUp(self):
self.evt = threading.Event()
- threading.Thread(target=server, args=(self.evt,)).start()
- time.sleep(.1)
+ servargs = (self.evt, "220 Hola mundo\n")
+ threading.Thread(target=server, args=servargs).start()
+
+ # wait until server thread has assigned a port number
+ n = 500
+ while PORT is None and n > 0:
+ time.sleep(0.01)
+ n -= 1
+
+ # wait a little longer (sometimes connections are refused
+ # on slow machines without this additional wait)
+ time.sleep(0.5)
def tearDown(self):
self.evt.wait()
- def testBasic(self):
+ def testBasic1(self):
# connects
- smtp = smtplib.SMTP("localhost", 9091)
+ smtp = smtplib.SMTP(HOST, PORT)
+ smtp.sock.close()
+
+ def testBasic2(self):
+ # connects, include port in host name
+ smtp = smtplib.SMTP("%s:%s" % (HOST, PORT))
+ smtp.sock.close()
+
+ def testLocalHostName(self):
+ # check that supplied local_hostname is used
+ smtp = smtplib.SMTP(HOST, PORT, local_hostname="testhost")
+ self.assertEqual(smtp.local_hostname, "testhost")
smtp.sock.close()
+ def testNonnumericPort(self):
+ # check that non-numeric port raises ValueError
+ self.assertRaises(socket.error, smtplib.SMTP,
+ "localhost", "bogus")
+
def testTimeoutDefault(self):
# default
- smtp = smtplib.SMTP("localhost", 9091)
+ smtp = smtplib.SMTP(HOST, PORT)
self.assertTrue(smtp.sock.gettimeout() is None)
smtp.sock.close()
def testTimeoutValue(self):
# a value
- smtp = smtplib.SMTP("localhost", 9091, timeout=30)
+ smtp = smtplib.SMTP(HOST, PORT, timeout=30)
self.assertEqual(smtp.sock.gettimeout(), 30)
smtp.sock.close()
@@ -56,16 +103,149 @@ class GeneralTests(TestCase):
previous = socket.getdefaulttimeout()
socket.setdefaulttimeout(30)
try:
- smtp = smtplib.SMTP("localhost", 9091, timeout=None)
+ smtp = smtplib.SMTP(HOST, PORT, timeout=None)
finally:
socket.setdefaulttimeout(previous)
self.assertEqual(smtp.sock.gettimeout(), 30)
smtp.sock.close()
+# Test server using smtpd.DebuggingServer
+def debugging_server(serv_evt, client_evt):
+ serv = smtpd.DebuggingServer(("", 0), ('nowhere', -1))
+ global PORT
+ PORT = serv.getsockname()[1]
+
+ try:
+ if hasattr(select, 'poll'):
+ poll_fun = asyncore.poll2
+ else:
+ poll_fun = asyncore.poll
+
+ n = 1000
+ while asyncore.socket_map and n > 0:
+ poll_fun(0.01, asyncore.socket_map)
+
+ # when the client conversation is finished, it will
+ # set client_evt, and it's then ok to kill the server
+ if client_evt.isSet():
+ serv.close()
+ break
+
+ n -= 1
+
+ except socket.timeout:
+ pass
+ finally:
+ # allow some time for the client to read the result
+ time.sleep(0.5)
+ serv.close()
+ asyncore.close_all()
+ PORT = None
+ time.sleep(0.5)
+ serv_evt.set()
+
+MSG_BEGIN = '---------- MESSAGE FOLLOWS ----------\n'
+MSG_END = '------------ END MESSAGE ------------\n'
+
+# Test behavior of smtpd.DebuggingServer
+# NOTE: the SMTP objects are created with a non-default local_hostname
+# argument to the constructor, since (on some systems) the FQDN lookup
+# caused by the default local_hostname sometimes takes so long that the
+# test server times out, causing the test to fail.
+class DebuggingServerTests(TestCase):
+
+ def setUp(self):
+ # temporarily replace sys.stdout to capture DebuggingServer output
+ self.old_stdout = sys.stdout
+ self.output = io.StringIO()
+ sys.stdout = self.output
+
+ self.serv_evt = threading.Event()
+ self.client_evt = threading.Event()
+ serv_args = (self.serv_evt, self.client_evt)
+ threading.Thread(target=debugging_server, args=serv_args).start()
+
+ # wait until server thread has assigned a port number
+ n = 500
+ while PORT is None and n > 0:
+ time.sleep(0.01)
+ n -= 1
+
+ # wait a little longer (sometimes connections are refused
+ # on slow machines without this additional wait)
+ time.sleep(0.5)
+
+ def tearDown(self):
+ # indicate that the client is finished
+ self.client_evt.set()
+ # wait for the server thread to terminate
+ self.serv_evt.wait()
+ # restore sys.stdout
+ sys.stdout = self.old_stdout
+
+ def testBasic(self):
+ # connect
+ smtp = smtplib.SMTP(HOST, PORT, local_hostname='localhost', timeout=3)
+ smtp.quit()
+
+ def testEHLO(self):
+ smtp = smtplib.SMTP(HOST, PORT, local_hostname='localhost', timeout=3)
+ expected = (502, b'Error: command "EHLO" not implemented')
+ self.assertEqual(smtp.ehlo(), expected)
+ smtp.quit()
+
+ def testHELP(self):
+ smtp = smtplib.SMTP(HOST, PORT, local_hostname='localhost', timeout=3)
+ self.assertEqual(smtp.help(), b'Error: command "HELP" not implemented')
+ smtp.quit()
+
+ def testSend(self):
+ # connect and send mail
+ m = 'A test message'
+ smtp = smtplib.SMTP(HOST, PORT, local_hostname='localhost', timeout=3)
+ smtp.sendmail('John', 'Sally', m)
+ smtp.quit()
+
+ self.client_evt.set()
+ self.serv_evt.wait()
+ self.output.flush()
+ mexpect = '%s%s\n%s' % (MSG_BEGIN, m, MSG_END)
+ self.assertEqual(self.output.getvalue(), mexpect)
+
+
+class BadHELOServerTests(TestCase):
+
+ def setUp(self):
+ self.old_stdout = sys.stdout
+ self.output = io.StringIO()
+ sys.stdout = self.output
+
+ self.evt = threading.Event()
+ servargs = (self.evt, b"199 no hello for you!\n")
+ threading.Thread(target=server, args=servargs).start()
+
+ # wait until server thread has assigned a port number
+ n = 500
+ while PORT is None and n > 0:
+ time.sleep(0.01)
+ n -= 1
+
+ # wait a little longer (sometimes connections are refused
+ # on slow machines without this additional wait)
+ time.sleep(0.5)
+
+ def tearDown(self):
+ self.evt.wait()
+ sys.stdout = self.old_stdout
+
+ def testFailingHELO(self):
+ self.assertRaises(smtplib.SMTPConnectError, smtplib.SMTP,
+ HOST, PORT, 'localhost', 3)
def test_main(verbose=None):
- test_support.run_unittest(GeneralTests)
+ test_support.run_unittest(GeneralTests, DebuggingServerTests,
+ BadHELOServerTests)
if __name__ == '__main__':
test_main()
diff --git a/Lib/test/test_socket_ssl.py b/Lib/test/test_socket_ssl.py
index 42efb6e..fd4383f 100644
--- a/Lib/test/test_socket_ssl.py
+++ b/Lib/test/test_socket_ssl.py
@@ -106,6 +106,25 @@ class BasicTests(unittest.TestCase):
connector()
t.join()
+ def test_978833(self):
+ if test_support.verbose:
+ print("test_978833 ...")
+
+ import os, httplib
+ with test_support.transient_internet():
+ s = socket.socket(socket.AF_INET)
+ s.connect(("www.sf.net", 443))
+ fd = s.fileno()
+ sock = httplib.FakeSocket(s, socket.ssl(s))
+ s = None
+ sock.close()
+ try:
+ os.fstat(fd)
+ except OSError:
+ pass
+ else:
+ raise test_support.TestFailed("Failed to close socket")
+
class OpenSSLTests(unittest.TestCase):
def testBasic(self):
diff --git a/Lib/test/test_unicodedata.py b/Lib/test/test_unicodedata.py
index 95706b2..91c4700 100644
--- a/Lib/test/test_unicodedata.py
+++ b/Lib/test/test_unicodedata.py
@@ -214,6 +214,9 @@ class UnicodeMiscTest(UnicodeDatabaseTest):
count += 1
self.assert_(count >= 10) # should have tested at least the ASCII digits
+ def test_bug_1704793(self):
+ self.assertEquals(self.db.lookup("GOTHIC LETTER FAIHU"), '\U00010346')
+
def test_main():
test.test_support.run_unittest(
UnicodeMiscTest,
diff --git a/Lib/test/test_urllib2_localnet.py b/Lib/test/test_urllib2_localnet.py
index 737ecbd..a126262 100644
--- a/Lib/test/test_urllib2_localnet.py
+++ b/Lib/test/test_urllib2_localnet.py
@@ -40,14 +40,16 @@ class LoopbackHttpServer(BaseHTTPServer.HTTPServer):
class LoopbackHttpServerThread(threading.Thread):
"""Stoppable thread that runs a loopback http server."""
- def __init__(self, port, RequestHandlerClass):
+ def __init__(self, request_handler):
threading.Thread.__init__(self)
- self._RequestHandlerClass = RequestHandlerClass
self._stop = False
- self._port = port
- self._server_address = ('127.0.0.1', self._port)
self.ready = threading.Event()
- self.error = None
+ request_handler.protocol_version = "HTTP/1.0"
+ self.httpd = LoopbackHttpServer(('127.0.0.1', 0),
+ request_handler)
+ #print "Serving HTTP on %s port %s" % (self.httpd.server_name,
+ # self.httpd.server_port)
+ self.port = self.httpd.server_port
def stop(self):
"""Stops the webserver if it's currently running."""
@@ -58,24 +60,9 @@ class LoopbackHttpServerThread(threading.Thread):
self.join()
def run(self):
- protocol = "HTTP/1.0"
-
- try:
- self._RequestHandlerClass.protocol_version = protocol
- httpd = LoopbackHttpServer(self._server_address,
- self._RequestHandlerClass)
-
- sa = httpd.socket.getsockname()
- #print "Serving HTTP on", sa[0], "port", sa[1], "..."
- except:
- # Fail "gracefully" if we are unable to start.
- self.ready.set()
- self.error = sys.exc_info()[1]
- raise
-
self.ready.set()
while not self._stop:
- httpd.handle_request()
+ self.httpd.handle_request()
# Authentication infrastructure
@@ -232,26 +219,21 @@ class FakeProxyHandler(BaseHTTPServer.BaseHTTPRequestHandler):
class ProxyAuthTests(unittest.TestCase):
URL = "http://www.foo.com"
- PORT = 8080
USER = "tester"
PASSWD = "test123"
REALM = "TestRealm"
- PROXY_URL = "http://127.0.0.1:%d" % PORT
-
def setUp(self):
FakeProxyHandler.digest_auth_handler.set_users({
self.USER : self.PASSWD
})
FakeProxyHandler.digest_auth_handler.set_realm(self.REALM)
- self.server = LoopbackHttpServerThread(self.PORT, FakeProxyHandler)
+ self.server = LoopbackHttpServerThread(FakeProxyHandler)
self.server.start()
self.server.ready.wait()
- if self.server.error:
- raise self.server.error
-
- handler = urllib2.ProxyHandler({"http" : self.PROXY_URL})
+ proxy_url = "http://127.0.0.1:%d" % self.server.port
+ handler = urllib2.ProxyHandler({"http" : proxy_url})
self._digest_auth_handler = urllib2.ProxyDigestAuthHandler()
self.opener = urllib2.build_opener(handler, self._digest_auth_handler)