diff options
Diffstat (limited to 'Lib/test')
-rw-r--r-- | Lib/test/test_asynchat.py | 213 | ||||
-rw-r--r-- | Lib/test/test_asyncore.py | 193 | ||||
-rw-r--r-- | Lib/test/test_codecmaps_cn.py | 7 | ||||
-rw-r--r-- | Lib/test/test_math.py | 10 | ||||
-rw-r--r-- | Lib/test/test_multibytecodec_support.py | 16 | ||||
-rw-r--r-- | Lib/test/test_pow.py | 3 | ||||
-rw-r--r-- | Lib/test/test_resource.py | 25 | ||||
-rw-r--r-- | Lib/test/test_runpy.py | 137 | ||||
-rw-r--r-- | Lib/test/test_smtplib.py | 210 | ||||
-rw-r--r-- | Lib/test/test_socket_ssl.py | 19 | ||||
-rw-r--r-- | Lib/test/test_unicodedata.py | 3 | ||||
-rw-r--r-- | Lib/test/test_urllib2_localnet.py | 40 |
12 files changed, 671 insertions, 205 deletions
diff --git a/Lib/test/test_asynchat.py b/Lib/test/test_asynchat.py index 7629296..be8e116 100644 --- a/Lib/test/test_asynchat.py +++ b/Lib/test/test_asynchat.py @@ -3,12 +3,17 @@ import thread # If this fails, we can't test this module import asyncore, asynchat, socket, threading, time import unittest +import sys from test import test_support HOST = "127.0.0.1" PORT = 54322 +SERVER_QUIT = b'QUIT\n' class echo_server(threading.Thread): + # parameter to determine the number of bytes passed back to the + # client each send + chunk_size = 1 def run(self): sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) @@ -17,15 +22,28 @@ class echo_server(threading.Thread): PORT = test_support.bind_port(sock, HOST, PORT) sock.listen(1) conn, client = sock.accept() - buffer = b"" - while b"\n" not in buffer: + self.buffer = b"" + # collect data until quit message is seen + while SERVER_QUIT not in self.buffer: data = conn.recv(1) if not data: break - buffer = buffer + data - while buffer: - n = conn.send(buffer) - buffer = buffer[n:] + self.buffer = self.buffer + data + + # remove the SERVER_QUIT message + self.buffer = self.buffer.replace(SERVER_QUIT, b'') + + # re-send entire set of collected data + try: + # this may fail on some tests, such as test_close_when_done, since + # the client closes the channel when it's done sending + while self.buffer: + n = conn.send(self.buffer[:self.chunk_size]) + time.sleep(0.001) + self.buffer = self.buffer[n:] + except: + pass + conn.close() sock.close() @@ -33,7 +51,7 @@ class echo_client(asynchat.async_chat): def __init__(self, terminator): asynchat.async_chat.__init__(self) - self.contents = None + self.contents = [] self.create_socket(socket.AF_INET, socket.SOCK_STREAM) self.connect((HOST, PORT)) self.set_terminator(terminator) @@ -41,53 +59,194 @@ class echo_client(asynchat.async_chat): def handle_connect(self): pass - ##print "Connected" + + if sys.platform == 'darwin': + # select.poll returns a select.POLLHUP at the end of the tests + # on darwin, so just ignore it + def handle_expt(self): + pass def collect_incoming_data(self, data): - self.buffer = self.buffer + data + self.buffer += data def found_terminator(self): - #print "Received:", repr(self.buffer) - self.contents = self.buffer + self.contents.append(self.buffer) self.buffer = b"" - self.close() class TestAsynchat(unittest.TestCase): + usepoll = False + def setUp (self): pass def tearDown (self): pass - def test_line_terminator(self): + def line_terminator_check(self, term, server_chunk): + s = echo_server() + s.chunk_size = server_chunk + s.start() + time.sleep(0.5) # Give server time to initialize + c = echo_client(term) + c.push(b"hello ") + c.push(bytes("world%s" % term)) + c.push(bytes("I'm not dead yet!%s" % term)) + c.push(SERVER_QUIT) + asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01) + s.join() + + self.assertEqual(c.contents, [b"hello world", b"I'm not dead yet!"]) + + # the line terminator tests below check receiving variously-sized + # chunks back from the server in order to exercise all branches of + # async_chat.handle_read + + def test_line_terminator1(self): + # test one-character terminator + for l in (1,2,3): + self.line_terminator_check(b'\n', l) + + def test_line_terminator2(self): + # test two-character terminator + for l in (1,2,3): + self.line_terminator_check(b'\r\n', l) + + def test_line_terminator3(self): + # test three-character terminator + for l in (1,2,3): + self.line_terminator_check(b'qqq', l) + + def numeric_terminator_check(self, termlen): + # Try reading a fixed number of bytes s = echo_server() s.start() - time.sleep(1) # Give server time to initialize - c = echo_client('\n') - c.push("hello ") - c.push("world\n") - asyncore.loop() + time.sleep(0.5) # Give server time to initialize + c = echo_client(termlen) + data = b"hello world, I'm not dead yet!\n" + c.push(data) + c.push(SERVER_QUIT) + asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01) s.join() - self.assertEqual(c.contents, b'hello world') + self.assertEqual(c.contents, [data[:termlen]]) - def test_numeric_terminator(self): + def test_numeric_terminator1(self): + # check that ints & longs both work (since type is + # explicitly checked in async_chat.handle_read) + self.numeric_terminator_check(1) + + def test_numeric_terminator2(self): + self.numeric_terminator_check(6) + + def test_none_terminator(self): # Try reading a fixed number of bytes s = echo_server() s.start() - time.sleep(1) # Give server time to initialize - c = echo_client(6) - c.push("hello ") - c.push("world\n") - asyncore.loop() + time.sleep(0.5) # Give server time to initialize + c = echo_client(None) + data = b"hello world, I'm not dead yet!\n" + c.push(data) + c.push(SERVER_QUIT) + asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01) + s.join() + + self.assertEqual(c.contents, []) + self.assertEqual(c.buffer, data) + + def test_simple_producer(self): + s = echo_server() + s.start() + time.sleep(0.5) # Give server time to initialize + c = echo_client(b'\n') + data = b"hello world\nI'm not dead yet!\n" + p = asynchat.simple_producer(data+SERVER_QUIT, buffer_size=8) + c.push_with_producer(p) + asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01) + s.join() + + self.assertEqual(c.contents, [b"hello world", b"I'm not dead yet!"]) + + def test_string_producer(self): + s = echo_server() + s.start() + time.sleep(0.5) # Give server time to initialize + c = echo_client(b'\n') + data = b"hello world\nI'm not dead yet!\n" + c.push_with_producer(data+SERVER_QUIT) + asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01) + s.join() + + self.assertEqual(c.contents, [b"hello world", b"I'm not dead yet!"]) + + def test_empty_line(self): + # checks that empty lines are handled correctly + s = echo_server() + s.start() + time.sleep(0.5) # Give server time to initialize + c = echo_client(b'\n') + c.push("hello world\n\nI'm not dead yet!\n") + c.push(SERVER_QUIT) + asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01) + s.join() + + self.assertEqual(c.contents, + [b"hello world", b"", b"I'm not dead yet!"]) + + def test_close_when_done(self): + s = echo_server() + s.start() + time.sleep(0.5) # Give server time to initialize + c = echo_client(b'\n') + c.push("hello world\nI'm not dead yet!\n") + c.push(SERVER_QUIT) + c.close_when_done() + asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01) s.join() - self.assertEqual(c.contents, b'hello ') + self.assertEqual(c.contents, []) + # the server might have been able to send a byte or two back, but this + # at least checks that it received something and didn't just fail + # (which could still result in the client not having received anything) + self.assertTrue(len(s.buffer) > 0) + + +class TestAsynchat_WithPoll(TestAsynchat): + usepoll = True + +class TestHelperFunctions(unittest.TestCase): + def test_find_prefix_at_end(self): + self.assertEqual(asynchat.find_prefix_at_end("qwerty\r", "\r\n"), 1) + self.assertEqual(asynchat.find_prefix_at_end("qwertydkjf", "\r\n"), 0) + +class TestFifo(unittest.TestCase): + def test_basic(self): + f = asynchat.fifo() + f.push(7) + f.push(b'a') + self.assertEqual(len(f), 2) + self.assertEqual(f.first(), 7) + self.assertEqual(f.pop(), (1, 7)) + self.assertEqual(len(f), 1) + self.assertEqual(f.first(), b'a') + self.assertEqual(f.is_empty(), False) + self.assertEqual(f.pop(), (1, b'a')) + self.assertEqual(len(f), 0) + self.assertEqual(f.is_empty(), True) + self.assertEqual(f.pop(), (0, None)) + + def test_given_list(self): + f = asynchat.fifo([b'x', 17, 3]) + self.assertEqual(len(f), 3) + self.assertEqual(f.pop(), (1, b'x')) + self.assertEqual(f.pop(), (1, 17)) + self.assertEqual(f.pop(), (1, 3)) + self.assertEqual(f.pop(), (0, None)) def test_main(verbose=None): - test_support.run_unittest(TestAsynchat) + test_support.run_unittest(TestAsynchat, TestAsynchat_WithPoll, + TestHelperFunctions, TestFifo) if __name__ == "__main__": test_main(verbose=True) diff --git a/Lib/test/test_asyncore.py b/Lib/test/test_asyncore.py index 6f848a7..33c2fb2 100644 --- a/Lib/test/test_asyncore.py +++ b/Lib/test/test_asyncore.py @@ -12,7 +12,7 @@ from test.test_support import TESTFN, run_unittest, unlink from io import StringIO, BytesIO HOST = "127.0.0.1" -PORT = 54329 +PORT = None class dummysocket: def __init__(self): @@ -53,12 +53,14 @@ class crashingdummy: # used when testing senders; just collects what it gets until newline is sent def capture_server(evt, buf): - serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - serv.settimeout(3) - serv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - serv.bind(("", PORT)) - serv.listen(5) try: + serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + serv.settimeout(3) + serv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + serv.bind(("", 0)) + global PORT + PORT = serv.getsockname()[1] + serv.listen(5) conn, addr = serv.accept() except socket.timeout: pass @@ -79,6 +81,7 @@ def capture_server(evt, buf): conn.close() finally: serv.close() + PORT = None evt.set() @@ -107,87 +110,83 @@ class HelperFunctionTests(unittest.TestCase): asyncore._exception(tr2) self.assertEqual(tr2.error_handled, True) -## Commented out these tests because test a non-documented function -## (which is actually public, why it's not documented?). Anyway, the -## tests *and* the function uses constants in the select module that -## are not present in Windows systems (see this thread: -## http://mail.python.org/pipermail/python-list/2001-October/109973.html) -## Note even that these constants are mentioned in the select -## documentation, as a parameter of "poll" method "register", but are -## not explicit declared as constants of the module. -## . Facundo Batista -## -## def test_readwrite(self): -## # Check that correct methods are called by readwrite() -## -## class testobj: -## def __init__(self): -## self.read = False -## self.write = False -## self.expt = False -## -## def handle_read_event(self): -## self.read = True -## -## def handle_write_event(self): -## self.write = True -## -## def handle_expt_event(self): -## self.expt = True -## -## def handle_error(self): -## self.error_handled = True -## -## for flag in (select.POLLIN, select.POLLPRI): -## tobj = testobj() -## self.assertEqual(tobj.read, False) -## asyncore.readwrite(tobj, flag) -## self.assertEqual(tobj.read, True) -## -## # check that ExitNow exceptions in the object handler method -## # bubbles all the way up through asyncore readwrite call -## tr1 = exitingdummy() -## self.assertRaises(asyncore.ExitNow, asyncore.readwrite, tr1, flag) -## -## # check that an exception other than ExitNow in the object handler -## # method causes the handle_error method to get called -## tr2 = crashingdummy() -## asyncore.readwrite(tr2, flag) -## self.assertEqual(tr2.error_handled, True) -## -## tobj = testobj() -## self.assertEqual(tobj.write, False) -## asyncore.readwrite(tobj, select.POLLOUT) -## self.assertEqual(tobj.write, True) -## -## # check that ExitNow exceptions in the object handler method -## # bubbles all the way up through asyncore readwrite call -## tr1 = exitingdummy() -## self.assertRaises(asyncore.ExitNow, asyncore.readwrite, tr1, -## select.POLLOUT) -## -## # check that an exception other than ExitNow in the object handler -## # method causes the handle_error method to get called -## tr2 = crashingdummy() -## asyncore.readwrite(tr2, select.POLLOUT) -## self.assertEqual(tr2.error_handled, True) -## -## for flag in (select.POLLERR, select.POLLHUP, select.POLLNVAL): -## tobj = testobj() -## self.assertEqual(tobj.expt, False) -## asyncore.readwrite(tobj, flag) -## self.assertEqual(tobj.expt, True) -## -## # check that ExitNow exceptions in the object handler method -## # bubbles all the way up through asyncore readwrite calls -## tr1 = exitingdummy() -## self.assertRaises(asyncore.ExitNow, asyncore.readwrite, tr1, flag) -## -## # check that an exception other than ExitNow in the object handler -## # method causes the handle_error method to get called -## tr2 = crashingdummy() -## asyncore.readwrite(tr2, flag) -## self.assertEqual(tr2.error_handled, True) + # asyncore.readwrite uses constants in the select module that + # are not present in Windows systems (see this thread: + # http://mail.python.org/pipermail/python-list/2001-October/109973.html) + # These constants should be present as long as poll is available + + if hasattr(select, 'poll'): + def test_readwrite(self): + # Check that correct methods are called by readwrite() + + class testobj: + def __init__(self): + self.read = False + self.write = False + self.expt = False + + def handle_read_event(self): + self.read = True + + def handle_write_event(self): + self.write = True + + def handle_expt_event(self): + self.expt = True + + def handle_error(self): + self.error_handled = True + + for flag in (select.POLLIN, select.POLLPRI): + tobj = testobj() + self.assertEqual(tobj.read, False) + asyncore.readwrite(tobj, flag) + self.assertEqual(tobj.read, True) + + # check that ExitNow exceptions in the object handler method + # bubbles all the way up through asyncore readwrite call + tr1 = exitingdummy() + self.assertRaises(asyncore.ExitNow, asyncore.readwrite, tr1, flag) + + # check that an exception other than ExitNow in the object handler + # method causes the handle_error method to get called + tr2 = crashingdummy() + asyncore.readwrite(tr2, flag) + self.assertEqual(tr2.error_handled, True) + + tobj = testobj() + self.assertEqual(tobj.write, False) + asyncore.readwrite(tobj, select.POLLOUT) + self.assertEqual(tobj.write, True) + + # check that ExitNow exceptions in the object handler method + # bubbles all the way up through asyncore readwrite call + tr1 = exitingdummy() + self.assertRaises(asyncore.ExitNow, asyncore.readwrite, tr1, + select.POLLOUT) + + # check that an exception other than ExitNow in the object handler + # method causes the handle_error method to get called + tr2 = crashingdummy() + asyncore.readwrite(tr2, select.POLLOUT) + self.assertEqual(tr2.error_handled, True) + + for flag in (select.POLLERR, select.POLLHUP, select.POLLNVAL): + tobj = testobj() + self.assertEqual(tobj.expt, False) + asyncore.readwrite(tobj, flag) + self.assertEqual(tobj.expt, True) + + # check that ExitNow exceptions in the object handler method + # bubbles all the way up through asyncore readwrite calls + tr1 = exitingdummy() + self.assertRaises(asyncore.ExitNow, asyncore.readwrite, tr1, flag) + + # check that an exception other than ExitNow in the object handler + # method causes the handle_error method to get called + tr2 = crashingdummy() + asyncore.readwrite(tr2, flag) + self.assertEqual(tr2.error_handled, True) def test_closeall(self): self.closeall_check(False) @@ -343,12 +342,26 @@ class DispatcherWithSendTests(unittest.TestCase): self.evt = threading.Event() cap = BytesIO() threading.Thread(target=capture_server, args=(self.evt, cap)).start() - time.sleep(1) # Give server time to initialize - data = b"Suppose there isn't a 16-ton weight?"*5 + # wait until server thread has assigned a port number + n = 1000 + while PORT is None and n > 0: + time.sleep(0.01) + n -= 1 + + # wait a little longer for the server to initialize (it sometimes + # refuses connections on slow machines without this wait) + time.sleep(0.2) + + data = b"Suppose there isn't a 16-ton weight?" d = dispatcherwithsend_noread() d.create_socket(socket.AF_INET, socket.SOCK_STREAM) d.connect((HOST, PORT)) + + # give time for socket to connect + time.sleep(0.1) + + d.send(data) d.send(data) d.send(b'\n') @@ -359,7 +372,7 @@ class DispatcherWithSendTests(unittest.TestCase): self.evt.wait() - self.assertEqual(cap.getvalue(), data) + self.assertEqual(cap.getvalue(), data*2) class DispatcherWithSendTests_UsePoll(DispatcherWithSendTests): diff --git a/Lib/test/test_codecmaps_cn.py b/Lib/test/test_codecmaps_cn.py index 75541ac..344fc56 100644 --- a/Lib/test/test_codecmaps_cn.py +++ b/Lib/test/test_codecmaps_cn.py @@ -19,6 +19,13 @@ class TestGBKMap(test_multibytecodec_support.TestBase_Mapping, mapfileurl = 'http://www.unicode.org/Public/MAPPINGS/VENDORS/' \ 'MICSFT/WINDOWS/CP936.TXT' +class TestGB18030Map(test_multibytecodec_support.TestBase_Mapping, + unittest.TestCase): + encoding = 'gb18030' + mapfileurl = 'http://source.icu-project.org/repos/icu/data/' \ + 'trunk/charset/data/xml/gb-18030-2000.xml' + + def test_main(): test_support.run_unittest(__name__) diff --git a/Lib/test/test_math.py b/Lib/test/test_math.py index e5c6ead..c28902b 100644 --- a/Lib/test/test_math.py +++ b/Lib/test/test_math.py @@ -12,7 +12,11 @@ class MathTests(unittest.TestCase): def ftest(self, name, value, expected): if abs(value-expected) > eps: - self.fail('%s returned %f, expected %f'%\ + # Use %r instead of %f so the error message + # displays full precision. Otherwise discrepancies + # in the last few bits will lead to very confusing + # error messages + self.fail('%s returned %r, expected %r' % (name, value, expected)) def testConstants(self): @@ -92,6 +96,10 @@ class MathTests(unittest.TestCase): self.ftest('floor(-0.5)', math.floor(-0.5), -1) self.ftest('floor(-1.0)', math.floor(-1.0), -1) self.ftest('floor(-1.5)', math.floor(-1.5), -2) + # pow() relies on floor() to check for integers + # This fails on some platforms - so check it here + self.ftest('floor(1.23e167)', math.floor(1.23e167), 1.23e167) + self.ftest('floor(-1.23e167)', math.floor(-1.23e167), -1.23e167) def testFmod(self): self.assertRaises(TypeError, math.fmod) diff --git a/Lib/test/test_multibytecodec_support.py b/Lib/test/test_multibytecodec_support.py index 40a67ea..2073223 100644 --- a/Lib/test/test_multibytecodec_support.py +++ b/Lib/test/test_multibytecodec_support.py @@ -5,7 +5,7 @@ # import sys, codecs, os.path -import unittest +import unittest, re from test import test_support from io import BytesIO @@ -272,6 +272,12 @@ class TestBase_Mapping(unittest.TestCase): return test_support.open_urlresource(self.mapfileurl) def test_mapping_file(self): + if self.mapfileurl.endswith('.xml'): + self._test_mapping_file_ucm() + else: + self._test_mapping_file_plain() + + def _test_mapping_file_plain(self): unichrs = lambda s: ''.join(map(chr, map(eval, s.split('+')))) urt_wa = {} @@ -303,6 +309,14 @@ class TestBase_Mapping(unittest.TestCase): self._testpoint(csetch, unich) + def _test_mapping_file_ucm(self): + ucmdata = self.open_mapping_file().read() + uc = re.findall('<a u="([A-F0-9]{4})" b="([0-9A-F ]+)"/>', ucmdata) + for uni, coded in uc: + unich = chr(int(uni, 16)) + codech = bytes(int(c, 16) for c in coded.split()) + self._testpoint(codech, unich) + def test_mapping_supplemental(self): for mapping in self.supmaps: self._testpoint(*mapping) diff --git a/Lib/test/test_pow.py b/Lib/test/test_pow.py index 62c641b..9b358f1 100644 --- a/Lib/test/test_pow.py +++ b/Lib/test/test_pow.py @@ -106,6 +106,9 @@ class PowTest(unittest.TestCase): # platform pow() was buggy, and Python didn't worm around it. eq = self.assertEquals a = -1.0 + # The next two tests can still fail if the platform floor() + # function doesn't treat all large inputs as integers + # test_math should also fail if that is happening eq(pow(a, 1.23e167), 1.0) eq(pow(a, -1.23e167), 1.0) for b in range(-10, 11): diff --git a/Lib/test/test_resource.py b/Lib/test/test_resource.py index 2450e78..25fea97 100644 --- a/Lib/test/test_resource.py +++ b/Lib/test/test_resource.py @@ -49,17 +49,24 @@ class ResourceTest(unittest.TestCase): except ValueError: limit_set = False f = open(test_support.TESTFN, "wb") - f.write("X" * 1024) try: - f.write("Y") - f.flush() - except IOError: - if not limit_set: - raise - f.close() - os.unlink(test_support.TESTFN) + f.write("X" * 1024) + try: + f.write("Y") + f.flush() + except IOError: + if not limit_set: + raise + if limit_set: + # Close will attempt to flush the byte we wrote + # Restore limit first to avoid getting a spurious error + resource.setrlimit(resource.RLIMIT_FSIZE, (cur, max)) + finally: + f.close() + os.unlink(test_support.TESTFN) finally: - resource.setrlimit(resource.RLIMIT_FSIZE, (cur, max)) + if limit_set: + resource.setrlimit(resource.RLIMIT_FSIZE, (cur, max)) def test_fsize_toobig(self): # Be sure that setrlimit is checking for really large values diff --git a/Lib/test/test_runpy.py b/Lib/test/test_runpy.py index 868662d..d10e40f 100644 --- a/Lib/test/test_runpy.py +++ b/Lib/test/test_runpy.py @@ -21,12 +21,12 @@ class RunModuleCodeTest(unittest.TestCase): "# Check the sys module\n" "import sys\n" "run_argv0 = sys.argv[0]\n" - "if __name__ in sys.modules:\n" - " run_name = sys.modules[__name__].__name__\n" + "run_name_in_sys_modules = __name__ in sys.modules\n" + "if run_name_in_sys_modules:\n" + " module_in_sys_modules = globals() is sys.modules[__name__].__dict__\n" "# Check nested operation\n" "import runpy\n" - "nested = runpy._run_module_code('x=1\\n', mod_name='<run>',\n" - " alter_sys=True)\n" + "nested = runpy._run_module_code('x=1\\n', mod_name='<run>')\n" ) @@ -37,34 +37,44 @@ class RunModuleCodeTest(unittest.TestCase): loader = "Now you're just being silly" d1 = dict(initial=initial) saved_argv0 = sys.argv[0] - d2 = _run_module_code(self.test_source, - d1, - name, - file, - loader, - True) - self.failUnless("result" not in d1) - self.failUnless(d2["initial"] is initial) - self.assertEqual(d2["result"], self.expected_result) - self.assertEqual(d2["nested"]["x"], 1) - self.failUnless(d2["__name__"] is name) - self.failUnless(d2["run_name"] is name) - self.failUnless(d2["__file__"] is file) - self.failUnless(d2["run_argv0"] is file) - self.failUnless(d2["__loader__"] is loader) - self.failUnless(sys.argv[0] is saved_argv0) - self.failUnless(name not in sys.modules) + try: + d2 = _run_module_code(self.test_source, + d1, + name, + file, + loader, + alter_sys=True) + self.failUnless("result" not in d1) + self.failUnless(d2["initial"] is initial) + self.assertEqual(d2["result"], self.expected_result) + self.assertEqual(d2["nested"]["x"], 1) + self.assertEqual(d2["nested"]["__name__"], "<run>") + self.failUnless(d2["__name__"] is name) + self.failUnless(d2["__file__"] is file) + self.failUnless(d2["__loader__"] is loader) + self.failUnless(d2["run_argv0"] is file) + self.failUnless(d2["run_name_in_sys_modules"]) + self.failUnless(d2["module_in_sys_modules"]) + self.failUnless(sys.argv[0] is not saved_argv0) + self.failUnless(name in sys.modules) + finally: + sys.argv[0] = saved_argv0 + if name in sys.modules: + del sys.modules[name] def test_run_module_code_defaults(self): saved_argv0 = sys.argv[0] d = _run_module_code(self.test_source) self.assertEqual(d["result"], self.expected_result) + self.failUnless(d["nested"]["x"] == 1) + self.failUnless(d["nested"]["__name__"] == "<run>") self.failUnless(d["__name__"] is None) self.failUnless(d["__file__"] is None) self.failUnless(d["__loader__"] is None) self.failUnless(d["run_argv0"] is saved_argv0) - self.failUnless("run_name" not in d) + self.failUnless(not d["run_name_in_sys_modules"]) self.failUnless(sys.argv[0] is saved_argv0) + self.failUnless(None not in sys.modules) class RunModuleTest(unittest.TestCase): @@ -77,19 +87,29 @@ class RunModuleTest(unittest.TestCase): self.fail("Expected import error for " + mod_name) def test_invalid_names(self): + # Builtin module self.expect_import_error("sys") + # Non-existent modules self.expect_import_error("sys.imp.eric") self.expect_import_error("os.path.half") self.expect_import_error("a.bee") self.expect_import_error(".howard") self.expect_import_error("..eaten") + # Package + self.expect_import_error("logging") def test_library_module(self): run_module("runpy") + def _add_pkg_dir(self, pkg_dir): + os.mkdir(pkg_dir) + pkg_fname = os.path.join(pkg_dir, "__init__"+os.extsep+"py") + pkg_file = open(pkg_fname, "w") + pkg_file.close() + return pkg_fname + def _make_pkg(self, source, depth): pkg_name = "__runpy_pkg__" - init_fname = "__init__"+os.extsep+"py" test_fname = "runpy_test"+os.extsep+"py" pkg_dir = sub_dir = tempfile.mkdtemp() if verbose: print(" Package tree in:", sub_dir) @@ -97,11 +117,8 @@ class RunModuleTest(unittest.TestCase): if verbose: print(" Updated sys.path:", sys.path[0]) for i in range(depth): sub_dir = os.path.join(sub_dir, pkg_name) - os.mkdir(sub_dir) + pkg_fname = self._add_pkg_dir(sub_dir) if verbose: print(" Next level in:", sub_dir) - pkg_fname = os.path.join(sub_dir, init_fname) - pkg_file = open(pkg_fname, "w") - pkg_file.close() if verbose: print(" Created:", pkg_fname) mod_fname = os.path.join(sub_dir, test_fname) mod_file = open(mod_fname, "w") @@ -112,13 +129,9 @@ class RunModuleTest(unittest.TestCase): return pkg_dir, mod_fname, mod_name def _del_pkg(self, top, depth, mod_name): - for i in range(depth+1): # Don't forget the module itself - parts = mod_name.rsplit(".", i) - entry = parts[0] - try: + for entry in list(sys.modules): + if entry.startswith("__runpy_pkg__"): del sys.modules[entry] - except KeyError as ex: - if verbose: print(ex) # Persist with cleaning up if verbose: print(" Removed sys.modules entries") del sys.path[0] if verbose: print(" Removed sys.path entry") @@ -146,23 +159,81 @@ class RunModuleTest(unittest.TestCase): try: if verbose: print("Running from source:", mod_name) d1 = run_module(mod_name) # Read from source + self.failUnless("x" in d1) self.assertEqual(d1["x"], 1) del d1 # Ensure __loader__ entry doesn't keep file open __import__(mod_name) os.remove(mod_fname) if verbose: print("Running from compiled:", mod_name) d2 = run_module(mod_name) # Read from bytecode + self.failUnless("x" in d2) self.assertEqual(d2["x"], 1) del d2 # Ensure __loader__ entry doesn't keep file open finally: self._del_pkg(pkg_dir, depth, mod_name) if verbose: print("Module executed successfully") + def _add_relative_modules(self, base_dir, depth): + if depth <= 1: + raise ValueError("Relative module test needs depth > 1") + pkg_name = "__runpy_pkg__" + module_dir = base_dir + for i in range(depth): + parent_dir = module_dir + module_dir = os.path.join(module_dir, pkg_name) + # Add sibling module + sibling_fname = os.path.join(module_dir, "sibling"+os.extsep+"py") + sibling_file = open(sibling_fname, "w") + sibling_file.close() + if verbose: print(" Added sibling module:", sibling_fname) + # Add nephew module + uncle_dir = os.path.join(parent_dir, "uncle") + self._add_pkg_dir(uncle_dir) + if verbose: print(" Added uncle package:", uncle_dir) + cousin_dir = os.path.join(uncle_dir, "cousin") + self._add_pkg_dir(cousin_dir) + if verbose: print(" Added cousin package:", cousin_dir) + nephew_fname = os.path.join(cousin_dir, "nephew"+os.extsep+"py") + nephew_file = open(nephew_fname, "w") + nephew_file.close() + if verbose: print(" Added nephew module:", nephew_fname) + + def _check_relative_imports(self, depth, run_name=None): + contents = """\ +from __future__ import absolute_import +from . import sibling +from ..uncle.cousin import nephew +""" + pkg_dir, mod_fname, mod_name = ( + self._make_pkg(contents, depth)) + try: + self._add_relative_modules(pkg_dir, depth) + if verbose: print("Running from source:", mod_name) + d1 = run_module(mod_name) # Read from source + self.failUnless("sibling" in d1) + self.failUnless("nephew" in d1) + del d1 # Ensure __loader__ entry doesn't keep file open + __import__(mod_name) + os.remove(mod_fname) + if verbose: print("Running from compiled:", mod_name) + d2 = run_module(mod_name) # Read from bytecode + self.failUnless("sibling" in d2) + self.failUnless("nephew" in d2) + del d2 # Ensure __loader__ entry doesn't keep file open + finally: + self._del_pkg(pkg_dir, depth, mod_name) + if verbose: print("Module executed successfully") + def test_run_module(self): for depth in range(4): if verbose: print("Testing package depth:", depth) self._check_module(depth) + def test_explicit_relative_import(self): + for depth in range(2, 5): + if verbose: print("Testing relative imports at depth:", depth) + self._check_relative_imports(depth) + def test_main(): run_unittest(RunModuleCodeTest) diff --git a/Lib/test/test_smtplib.py b/Lib/test/test_smtplib.py index 3542ddb..cf92662 100644 --- a/Lib/test/test_smtplib.py +++ b/Lib/test/test_smtplib.py @@ -1,53 +1,100 @@ +import asyncore import socket import threading +import smtpd import smtplib +import io +import sys import time +import select from unittest import TestCase from test import test_support +# PORT is used to communicate the port number assigned to the server +# to the test client +HOST = "localhost" +PORT = None -def server(evt): - serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - serv.settimeout(3) - serv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - serv.bind(("", 9091)) - serv.listen(5) +def server(evt, buf): try: + serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + serv.settimeout(3) + serv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + serv.bind(("", 0)) + global PORT + PORT = serv.getsockname()[1] + serv.listen(5) conn, addr = serv.accept() except socket.timeout: pass else: - conn.send("220 Hola mundo\n") + n = 500 + while buf and n > 0: + r, w, e = select.select([], [conn], []) + if w: + sent = conn.send(buf) + buf = buf[sent:] + + n -= 1 + time.sleep(0.01) + conn.close() finally: serv.close() + PORT = None evt.set() class GeneralTests(TestCase): def setUp(self): self.evt = threading.Event() - threading.Thread(target=server, args=(self.evt,)).start() - time.sleep(.1) + servargs = (self.evt, "220 Hola mundo\n") + threading.Thread(target=server, args=servargs).start() + + # wait until server thread has assigned a port number + n = 500 + while PORT is None and n > 0: + time.sleep(0.01) + n -= 1 + + # wait a little longer (sometimes connections are refused + # on slow machines without this additional wait) + time.sleep(0.5) def tearDown(self): self.evt.wait() - def testBasic(self): + def testBasic1(self): # connects - smtp = smtplib.SMTP("localhost", 9091) + smtp = smtplib.SMTP(HOST, PORT) + smtp.sock.close() + + def testBasic2(self): + # connects, include port in host name + smtp = smtplib.SMTP("%s:%s" % (HOST, PORT)) + smtp.sock.close() + + def testLocalHostName(self): + # check that supplied local_hostname is used + smtp = smtplib.SMTP(HOST, PORT, local_hostname="testhost") + self.assertEqual(smtp.local_hostname, "testhost") smtp.sock.close() + def testNonnumericPort(self): + # check that non-numeric port raises ValueError + self.assertRaises(socket.error, smtplib.SMTP, + "localhost", "bogus") + def testTimeoutDefault(self): # default - smtp = smtplib.SMTP("localhost", 9091) + smtp = smtplib.SMTP(HOST, PORT) self.assertTrue(smtp.sock.gettimeout() is None) smtp.sock.close() def testTimeoutValue(self): # a value - smtp = smtplib.SMTP("localhost", 9091, timeout=30) + smtp = smtplib.SMTP(HOST, PORT, timeout=30) self.assertEqual(smtp.sock.gettimeout(), 30) smtp.sock.close() @@ -56,16 +103,149 @@ class GeneralTests(TestCase): previous = socket.getdefaulttimeout() socket.setdefaulttimeout(30) try: - smtp = smtplib.SMTP("localhost", 9091, timeout=None) + smtp = smtplib.SMTP(HOST, PORT, timeout=None) finally: socket.setdefaulttimeout(previous) self.assertEqual(smtp.sock.gettimeout(), 30) smtp.sock.close() +# Test server using smtpd.DebuggingServer +def debugging_server(serv_evt, client_evt): + serv = smtpd.DebuggingServer(("", 0), ('nowhere', -1)) + global PORT + PORT = serv.getsockname()[1] + + try: + if hasattr(select, 'poll'): + poll_fun = asyncore.poll2 + else: + poll_fun = asyncore.poll + + n = 1000 + while asyncore.socket_map and n > 0: + poll_fun(0.01, asyncore.socket_map) + + # when the client conversation is finished, it will + # set client_evt, and it's then ok to kill the server + if client_evt.isSet(): + serv.close() + break + + n -= 1 + + except socket.timeout: + pass + finally: + # allow some time for the client to read the result + time.sleep(0.5) + serv.close() + asyncore.close_all() + PORT = None + time.sleep(0.5) + serv_evt.set() + +MSG_BEGIN = '---------- MESSAGE FOLLOWS ----------\n' +MSG_END = '------------ END MESSAGE ------------\n' + +# Test behavior of smtpd.DebuggingServer +# NOTE: the SMTP objects are created with a non-default local_hostname +# argument to the constructor, since (on some systems) the FQDN lookup +# caused by the default local_hostname sometimes takes so long that the +# test server times out, causing the test to fail. +class DebuggingServerTests(TestCase): + + def setUp(self): + # temporarily replace sys.stdout to capture DebuggingServer output + self.old_stdout = sys.stdout + self.output = io.StringIO() + sys.stdout = self.output + + self.serv_evt = threading.Event() + self.client_evt = threading.Event() + serv_args = (self.serv_evt, self.client_evt) + threading.Thread(target=debugging_server, args=serv_args).start() + + # wait until server thread has assigned a port number + n = 500 + while PORT is None and n > 0: + time.sleep(0.01) + n -= 1 + + # wait a little longer (sometimes connections are refused + # on slow machines without this additional wait) + time.sleep(0.5) + + def tearDown(self): + # indicate that the client is finished + self.client_evt.set() + # wait for the server thread to terminate + self.serv_evt.wait() + # restore sys.stdout + sys.stdout = self.old_stdout + + def testBasic(self): + # connect + smtp = smtplib.SMTP(HOST, PORT, local_hostname='localhost', timeout=3) + smtp.quit() + + def testEHLO(self): + smtp = smtplib.SMTP(HOST, PORT, local_hostname='localhost', timeout=3) + expected = (502, b'Error: command "EHLO" not implemented') + self.assertEqual(smtp.ehlo(), expected) + smtp.quit() + + def testHELP(self): + smtp = smtplib.SMTP(HOST, PORT, local_hostname='localhost', timeout=3) + self.assertEqual(smtp.help(), b'Error: command "HELP" not implemented') + smtp.quit() + + def testSend(self): + # connect and send mail + m = 'A test message' + smtp = smtplib.SMTP(HOST, PORT, local_hostname='localhost', timeout=3) + smtp.sendmail('John', 'Sally', m) + smtp.quit() + + self.client_evt.set() + self.serv_evt.wait() + self.output.flush() + mexpect = '%s%s\n%s' % (MSG_BEGIN, m, MSG_END) + self.assertEqual(self.output.getvalue(), mexpect) + + +class BadHELOServerTests(TestCase): + + def setUp(self): + self.old_stdout = sys.stdout + self.output = io.StringIO() + sys.stdout = self.output + + self.evt = threading.Event() + servargs = (self.evt, b"199 no hello for you!\n") + threading.Thread(target=server, args=servargs).start() + + # wait until server thread has assigned a port number + n = 500 + while PORT is None and n > 0: + time.sleep(0.01) + n -= 1 + + # wait a little longer (sometimes connections are refused + # on slow machines without this additional wait) + time.sleep(0.5) + + def tearDown(self): + self.evt.wait() + sys.stdout = self.old_stdout + + def testFailingHELO(self): + self.assertRaises(smtplib.SMTPConnectError, smtplib.SMTP, + HOST, PORT, 'localhost', 3) def test_main(verbose=None): - test_support.run_unittest(GeneralTests) + test_support.run_unittest(GeneralTests, DebuggingServerTests, + BadHELOServerTests) if __name__ == '__main__': test_main() diff --git a/Lib/test/test_socket_ssl.py b/Lib/test/test_socket_ssl.py index 42efb6e..fd4383f 100644 --- a/Lib/test/test_socket_ssl.py +++ b/Lib/test/test_socket_ssl.py @@ -106,6 +106,25 @@ class BasicTests(unittest.TestCase): connector() t.join() + def test_978833(self): + if test_support.verbose: + print("test_978833 ...") + + import os, httplib + with test_support.transient_internet(): + s = socket.socket(socket.AF_INET) + s.connect(("www.sf.net", 443)) + fd = s.fileno() + sock = httplib.FakeSocket(s, socket.ssl(s)) + s = None + sock.close() + try: + os.fstat(fd) + except OSError: + pass + else: + raise test_support.TestFailed("Failed to close socket") + class OpenSSLTests(unittest.TestCase): def testBasic(self): diff --git a/Lib/test/test_unicodedata.py b/Lib/test/test_unicodedata.py index 95706b2..91c4700 100644 --- a/Lib/test/test_unicodedata.py +++ b/Lib/test/test_unicodedata.py @@ -214,6 +214,9 @@ class UnicodeMiscTest(UnicodeDatabaseTest): count += 1 self.assert_(count >= 10) # should have tested at least the ASCII digits + def test_bug_1704793(self): + self.assertEquals(self.db.lookup("GOTHIC LETTER FAIHU"), '\U00010346') + def test_main(): test.test_support.run_unittest( UnicodeMiscTest, diff --git a/Lib/test/test_urllib2_localnet.py b/Lib/test/test_urllib2_localnet.py index 737ecbd..a126262 100644 --- a/Lib/test/test_urllib2_localnet.py +++ b/Lib/test/test_urllib2_localnet.py @@ -40,14 +40,16 @@ class LoopbackHttpServer(BaseHTTPServer.HTTPServer): class LoopbackHttpServerThread(threading.Thread): """Stoppable thread that runs a loopback http server.""" - def __init__(self, port, RequestHandlerClass): + def __init__(self, request_handler): threading.Thread.__init__(self) - self._RequestHandlerClass = RequestHandlerClass self._stop = False - self._port = port - self._server_address = ('127.0.0.1', self._port) self.ready = threading.Event() - self.error = None + request_handler.protocol_version = "HTTP/1.0" + self.httpd = LoopbackHttpServer(('127.0.0.1', 0), + request_handler) + #print "Serving HTTP on %s port %s" % (self.httpd.server_name, + # self.httpd.server_port) + self.port = self.httpd.server_port def stop(self): """Stops the webserver if it's currently running.""" @@ -58,24 +60,9 @@ class LoopbackHttpServerThread(threading.Thread): self.join() def run(self): - protocol = "HTTP/1.0" - - try: - self._RequestHandlerClass.protocol_version = protocol - httpd = LoopbackHttpServer(self._server_address, - self._RequestHandlerClass) - - sa = httpd.socket.getsockname() - #print "Serving HTTP on", sa[0], "port", sa[1], "..." - except: - # Fail "gracefully" if we are unable to start. - self.ready.set() - self.error = sys.exc_info()[1] - raise - self.ready.set() while not self._stop: - httpd.handle_request() + self.httpd.handle_request() # Authentication infrastructure @@ -232,26 +219,21 @@ class FakeProxyHandler(BaseHTTPServer.BaseHTTPRequestHandler): class ProxyAuthTests(unittest.TestCase): URL = "http://www.foo.com" - PORT = 8080 USER = "tester" PASSWD = "test123" REALM = "TestRealm" - PROXY_URL = "http://127.0.0.1:%d" % PORT - def setUp(self): FakeProxyHandler.digest_auth_handler.set_users({ self.USER : self.PASSWD }) FakeProxyHandler.digest_auth_handler.set_realm(self.REALM) - self.server = LoopbackHttpServerThread(self.PORT, FakeProxyHandler) + self.server = LoopbackHttpServerThread(FakeProxyHandler) self.server.start() self.server.ready.wait() - if self.server.error: - raise self.server.error - - handler = urllib2.ProxyHandler({"http" : self.PROXY_URL}) + proxy_url = "http://127.0.0.1:%d" % self.server.port + handler = urllib2.ProxyHandler({"http" : proxy_url}) self._digest_auth_handler = urllib2.ProxyDigestAuthHandler() self.opener = urllib2.build_opener(handler, self._digest_auth_handler) |