diff options
Diffstat (limited to 'Lib/test/test_poplib.py')
-rw-r--r-- | Lib/test/test_poplib.py | 351 |
1 files changed, 86 insertions, 265 deletions
diff --git a/Lib/test/test_poplib.py b/Lib/test/test_poplib.py index 7b1d854..d214375 100644 --- a/Lib/test/test_poplib.py +++ b/Lib/test/test_poplib.py @@ -8,28 +8,18 @@ import asyncore import asynchat import socket import os +import time import errno -import threading from unittest import TestCase, skipUnless -from test import support as test_support +from test import test_support +from test.test_support import HOST +threading = test_support.import_module('threading') -HOST = test_support.HOST -PORT = 0 - -SUPPORTS_SSL = False -if hasattr(poplib, 'POP3_SSL'): - import ssl - - SUPPORTS_SSL = True - CERTFILE = os.path.join(os.path.dirname(__file__) or os.curdir, "keycert3.pem") - CAFILE = os.path.join(os.path.dirname(__file__) or os.curdir, "pycacert.pem") - -requires_ssl = skipUnless(SUPPORTS_SSL, 'SSL not supported') # the dummy data returned by server when LIST and RETR commands are issued -LIST_RESP = b'1 1\r\n2 2\r\n3 3\r\n4 4\r\n5 5\r\n.\r\n' -RETR_RESP = b"""From: postmaster@python.org\ +LIST_RESP = '1 1\r\n2 2\r\n3 3\r\n4 4\r\n5 5\r\n.\r\n' +RETR_RESP = """From: postmaster@python.org\ \r\nContent-Type: text/plain\r\n\ MIME-Version: 1.0\r\n\ Subject: Dummy\r\n\ @@ -42,23 +32,17 @@ line3\r\n\ class DummyPOP3Handler(asynchat.async_chat): - CAPAS = {'UIDL': [], 'IMPLEMENTATION': ['python-testlib-pop-server']} - enable_UTF8 = False - def __init__(self, conn): asynchat.async_chat.__init__(self, conn) - self.set_terminator(b"\r\n") + self.set_terminator("\r\n") self.in_buffer = [] - self.push('+OK dummy pop3 server ready. <timestamp>') - self.tls_active = False - self.tls_starting = False + self.push('+OK dummy pop3 server ready.') def collect_incoming_data(self, data): self.in_buffer.append(data) def found_terminator(self): - line = b''.join(self.in_buffer) - line = str(line, 'ISO-8859-1') + line = ''.join(self.in_buffer) self.in_buffer = [] cmd = line.split(' ')[0].lower() space = line.find(' ') @@ -76,7 +60,7 @@ class DummyPOP3Handler(asynchat.async_chat): raise def push(self, data): - asynchat.async_chat.push(self, data.encode("ISO-8859-1") + b'\r\n') + asynchat.async_chat.push(self, data + '\r\n') def cmd_echo(self, arg): # sends back the received string (used by the test suite) @@ -97,7 +81,7 @@ class DummyPOP3Handler(asynchat.async_chat): def cmd_list(self, arg): if arg: - self.push('+OK %s %s' % (arg, arg)) + self.push('+OK %s %s' %(arg, arg)) else: self.push('+OK') asynchat.async_chat.push(self, LIST_RESP) @@ -119,83 +103,6 @@ class DummyPOP3Handler(asynchat.async_chat): def cmd_rpop(self, arg): self.push('+OK done nothing.') - def cmd_apop(self, arg): - self.push('+OK done nothing.') - - def cmd_quit(self, arg): - self.push('+OK closing.') - self.close_when_done() - - def _get_capas(self): - _capas = dict(self.CAPAS) - if not self.tls_active and SUPPORTS_SSL: - _capas['STLS'] = [] - return _capas - - def cmd_capa(self, arg): - self.push('+OK Capability list follows') - if self._get_capas(): - for cap, params in self._get_capas().items(): - _ln = [cap] - if params: - _ln.extend(params) - self.push(' '.join(_ln)) - self.push('.') - - def cmd_utf8(self, arg): - self.push('+OK I know RFC6856' - if self.enable_UTF8 - else '-ERR What is UTF8?!') - - if SUPPORTS_SSL: - - def cmd_stls(self, arg): - if self.tls_active is False: - self.push('+OK Begin TLS negotiation') - context = ssl.SSLContext() - context.load_cert_chain(CERTFILE) - tls_sock = context.wrap_socket(self.socket, - server_side=True, - do_handshake_on_connect=False, - suppress_ragged_eofs=False) - self.del_channel() - self.set_socket(tls_sock) - self.tls_active = True - self.tls_starting = True - self.in_buffer = [] - self._do_tls_handshake() - else: - self.push('-ERR Command not permitted when TLS active') - - def _do_tls_handshake(self): - try: - self.socket.do_handshake() - except ssl.SSLError as err: - if err.args[0] in (ssl.SSL_ERROR_WANT_READ, - ssl.SSL_ERROR_WANT_WRITE): - return - elif err.args[0] == ssl.SSL_ERROR_EOF: - return self.handle_close() - # TODO: SSLError does not expose alert information - elif ("SSLV3_ALERT_BAD_CERTIFICATE" in err.args[1] or - "SSLV3_ALERT_CERTIFICATE_UNKNOWN" in err.args[1]): - return self.handle_close() - raise - except OSError as err: - if err.args[0] == errno.ECONNABORTED: - return self.handle_close() - else: - self.tls_active = True - self.tls_starting = False - - def handle_read(self): - if self.tls_starting: - self._do_tls_handshake() - else: - try: - asynchat.async_chat.handle_read(self) - except ssl.SSLEOFError: - self.handle_close() class DummyPOP3Server(asyncore.dispatcher, threading.Thread): @@ -204,14 +111,12 @@ class DummyPOP3Server(asyncore.dispatcher, threading.Thread): def __init__(self, address, af=socket.AF_INET): threading.Thread.__init__(self) asyncore.dispatcher.__init__(self) - self.daemon = True self.create_socket(af, socket.SOCK_STREAM) self.bind(address) self.listen(5) self.active = False self.active_lock = threading.Lock() self.host, self.port = self.socket.getsockname()[:2] - self.handler_instance = None def start(self): assert not self.active @@ -222,20 +127,21 @@ class DummyPOP3Server(asyncore.dispatcher, threading.Thread): def run(self): self.active = True self.__flag.set() - try: - while self.active and asyncore.socket_map: - with self.active_lock: - asyncore.loop(timeout=0.1, count=1) - finally: - asyncore.close_all(ignore_all=True) + while self.active and asyncore.socket_map: + self.active_lock.acquire() + asyncore.loop(timeout=0.1, count=1) + self.active_lock.release() + asyncore.close_all(ignore_all=True) def stop(self): assert self.active self.active = False self.join() - def handle_accepted(self, conn, addr): - self.handler_instance = self.handler(conn) + def handle_accept(self): + conn, addr = self.accept() + self.handler = self.handler(conn) + self.close() def handle_connect(self): self.close() @@ -249,23 +155,21 @@ class DummyPOP3Server(asyncore.dispatcher, threading.Thread): class TestPOP3Class(TestCase): + def assertOK(self, resp): - self.assertTrue(resp.startswith(b"+OK")) + self.assertTrue(resp.startswith("+OK")) def setUp(self): - self.server = DummyPOP3Server((HOST, PORT)) + self.server = DummyPOP3Server((HOST, 0)) self.server.start() - self.client = poplib.POP3(self.server.host, self.server.port, timeout=3) + self.client = poplib.POP3(self.server.host, self.server.port) def tearDown(self): - self.client.close() + self.client.quit() self.server.stop() - # Explicitly clear the attribute to prevent dangling thread - self.server = None def test_getwelcome(self): - self.assertEqual(self.client.getwelcome(), - b'+OK dummy pop3 server ready. <timestamp>') + self.assertEqual(self.client.getwelcome(), '+OK dummy pop3 server ready.') def test_exceptions(self): self.assertRaises(poplib.error_proto, self.client._shortcmd, 'echo -err') @@ -283,18 +187,16 @@ class TestPOP3Class(TestCase): def test_list(self): self.assertEqual(self.client.list()[1:], - ([b'1 1', b'2 2', b'3 3', b'4 4', b'5 5'], - 25)) - self.assertTrue(self.client.list('1').endswith(b"OK 1 1")) + (['1 1', '2 2', '3 3', '4 4', '5 5'], 25)) + self.assertTrue(self.client.list('1').endswith("OK 1 1")) def test_retr(self): - expected = (b'+OK 116 bytes', - [b'From: postmaster@python.org', b'Content-Type: text/plain', - b'MIME-Version: 1.0', b'Subject: Dummy', - b'', b'line1', b'line2', b'line3'], + expected = ('+OK 116 bytes', + ['From: postmaster@python.org', 'Content-Type: text/plain', + 'MIME-Version: 1.0', 'Subject: Dummy', + '', 'line1', 'line2', 'line3'], 113) - foo = self.client.retr('foo') - self.assertEqual(foo, expected) + self.assertEqual(self.client.retr('foo'), expected) def test_too_long_lines(self): self.assertRaises(poplib.error_proto, self.client._shortcmd, @@ -309,11 +211,6 @@ class TestPOP3Class(TestCase): def test_rpop(self): self.assertOK(self.client.rpop('foo')) - @test_support.requires_hashdigest('md5') - def test_apop_normal(self): - self.assertOK(self.client.apop('foo', 'dummypassword')) - - @test_support.requires_hashdigest('md5') def test_apop_REDOS(self): # Replace welcome with very long evil welcome. # NB The upper bound on welcome length is currently 2048. @@ -325,10 +222,10 @@ class TestPOP3Class(TestCase): self.assertRaises(poplib.error_proto, self.client.apop, 'a', 'kb') def test_top(self): - expected = (b'+OK 116 bytes', - [b'From: postmaster@python.org', b'Content-Type: text/plain', - b'MIME-Version: 1.0', b'Subject: Dummy', b'', - b'line1', b'line2', b'line3'], + expected = ('+OK 116 bytes', + ['From: postmaster@python.org', 'Content-Type: text/plain', + 'MIME-Version: 1.0', 'Subject: Dummy', '', + 'line1', 'line2', 'line3'], 113) self.assertEqual(self.client.top(1, 1), expected) @@ -336,72 +233,58 @@ class TestPOP3Class(TestCase): self.client.uidl() self.client.uidl('foo') - def test_utf8_raises_if_unsupported(self): - self.server.handler.enable_UTF8 = False - self.assertRaises(poplib.error_proto, self.client.utf8) - - def test_utf8(self): - self.server.handler.enable_UTF8 = True - expected = b'+OK I know RFC6856' - result = self.client.utf8() - self.assertEqual(result, expected) - - def test_capa(self): - capa = self.client.capa() - self.assertTrue('IMPLEMENTATION' in capa.keys()) - - def test_quit(self): - resp = self.client.quit() - self.assertTrue(resp) - self.assertIsNone(self.client.sock) - self.assertIsNone(self.client.file) - - @requires_ssl - def test_stls_capa(self): - capa = self.client.capa() - self.assertTrue('STLS' in capa.keys()) - - @requires_ssl - def test_stls(self): - expected = b'+OK Begin TLS negotiation' - resp = self.client.stls() - self.assertEqual(resp, expected) - - @requires_ssl - def test_stls_context(self): - expected = b'+OK Begin TLS negotiation' - ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) - ctx.load_verify_locations(CAFILE) - self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED) - self.assertEqual(ctx.check_hostname, True) - with self.assertRaises(ssl.CertificateError): - resp = self.client.stls(context=ctx) - self.client = poplib.POP3("localhost", self.server.port, timeout=3) - resp = self.client.stls(context=ctx) - self.assertEqual(resp, expected) - - -if SUPPORTS_SSL: - from test.test_ftplib import SSLConnection - - class DummyPOP3_SSLHandler(SSLConnection, DummyPOP3Handler): + +SUPPORTS_SSL = False +if hasattr(poplib, 'POP3_SSL'): + import ssl + + SUPPORTS_SSL = True + CERTFILE = os.path.join(os.path.dirname(__file__) or os.curdir, "keycert.pem") + + class DummyPOP3_SSLHandler(DummyPOP3Handler): def __init__(self, conn): asynchat.async_chat.__init__(self, conn) - self.secure_connection() - self.set_terminator(b"\r\n") + self.socket = ssl.wrap_socket(self.socket, certfile=CERTFILE, + server_side=True, + do_handshake_on_connect=False) + # Must try handshake before calling push() + self._ssl_accepting = True + self._do_ssl_handshake() + self.set_terminator("\r\n") self.in_buffer = [] - self.push('+OK dummy pop3 server ready. <timestamp>') - self.tls_active = True - self.tls_starting = False + self.push('+OK dummy pop3 server ready.') + def _do_ssl_handshake(self): + try: + self.socket.do_handshake() + except ssl.SSLError, err: + if err.args[0] in (ssl.SSL_ERROR_WANT_READ, + ssl.SSL_ERROR_WANT_WRITE): + return + elif err.args[0] == ssl.SSL_ERROR_EOF: + return self.handle_close() + raise + except socket.error, err: + if err.args[0] == errno.ECONNABORTED: + return self.handle_close() + else: + self._ssl_accepting = False + + def handle_read(self): + if self._ssl_accepting: + self._do_ssl_handshake() + else: + DummyPOP3Handler.handle_read(self) + +requires_ssl = skipUnless(SUPPORTS_SSL, 'SSL not supported') @requires_ssl class TestPOP3_SSLClass(TestPOP3Class): # repeat previous tests by using poplib.POP3_SSL def setUp(self): - self.server = DummyPOP3Server((HOST, PORT)) + self.server = DummyPOP3Server((HOST, 0)) self.server.handler = DummyPOP3_SSLHandler self.server.start() self.client = poplib.POP3_SSL(self.server.host, self.server.port) @@ -409,67 +292,6 @@ class TestPOP3_SSLClass(TestPOP3Class): def test__all__(self): self.assertIn('POP3_SSL', poplib.__all__) - def test_context(self): - ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) - ctx.check_hostname = False - ctx.verify_mode = ssl.CERT_NONE - self.assertRaises(ValueError, poplib.POP3_SSL, self.server.host, - self.server.port, keyfile=CERTFILE, context=ctx) - self.assertRaises(ValueError, poplib.POP3_SSL, self.server.host, - self.server.port, certfile=CERTFILE, context=ctx) - self.assertRaises(ValueError, poplib.POP3_SSL, self.server.host, - self.server.port, keyfile=CERTFILE, - certfile=CERTFILE, context=ctx) - - self.client.quit() - self.client = poplib.POP3_SSL(self.server.host, self.server.port, - context=ctx) - self.assertIsInstance(self.client.sock, ssl.SSLSocket) - self.assertIs(self.client.sock.context, ctx) - self.assertTrue(self.client.noop().startswith(b'+OK')) - - def test_stls(self): - self.assertRaises(poplib.error_proto, self.client.stls) - - test_stls_context = test_stls - - def test_stls_capa(self): - capa = self.client.capa() - self.assertFalse('STLS' in capa.keys()) - - -@requires_ssl -class TestPOP3_TLSClass(TestPOP3Class): - # repeat previous tests by using poplib.POP3.stls() - - def setUp(self): - self.server = DummyPOP3Server((HOST, PORT)) - self.server.start() - self.client = poplib.POP3(self.server.host, self.server.port, timeout=3) - self.client.stls() - - def tearDown(self): - if self.client.file is not None and self.client.sock is not None: - try: - self.client.quit() - except poplib.error_proto: - # happens in the test_too_long_lines case; the overlong - # response will be treated as response to QUIT and raise - # this exception - self.client.close() - self.server.stop() - # Explicitly clear the attribute to prevent dangling thread - self.server = None - - def test_stls(self): - self.assertRaises(poplib.error_proto, self.client.stls) - - test_stls_context = test_stls - - def test_stls_capa(self): - capa = self.client.capa() - self.assertFalse(b'STLS' in capa.keys()) - class TestTimeouts(TestCase): @@ -479,21 +301,20 @@ class TestTimeouts(TestCase): self.sock.settimeout(60) # Safety net. Look issue 11812 self.port = test_support.bind_port(self.sock) self.thread = threading.Thread(target=self.server, args=(self.evt,self.sock)) - self.thread.daemon = True + self.thread.setDaemon(True) self.thread.start() self.evt.wait() def tearDown(self): self.thread.join() - # Explicitly clear the attribute to prevent dangling thread - self.thread = None + del self.thread # Clear out any dangling Thread objects. def server(self, evt, serv): - serv.listen() + serv.listen(5) evt.set() try: conn, addr = serv.accept() - conn.send(b"+ Hola mundo\n") + conn.send("+ Hola mundo\n") conn.close() except socket.timeout: pass @@ -508,7 +329,7 @@ class TestTimeouts(TestCase): finally: socket.setdefaulttimeout(None) self.assertEqual(pop.sock.gettimeout(), 30) - pop.close() + pop.sock.close() def testTimeoutNone(self): self.assertIsNone(socket.getdefaulttimeout()) @@ -518,17 +339,17 @@ class TestTimeouts(TestCase): finally: socket.setdefaulttimeout(None) self.assertIsNone(pop.sock.gettimeout()) - pop.close() + pop.sock.close() def testTimeoutValue(self): pop = poplib.POP3(HOST, self.port, timeout=30) self.assertEqual(pop.sock.gettimeout(), 30) - pop.close() + pop.sock.close() def test_main(): tests = [TestPOP3Class, TestTimeouts, - TestPOP3_SSLClass, TestPOP3_TLSClass] + TestPOP3_SSLClass] thread_info = test_support.threading_setup() try: test_support.run_unittest(*tests) |