diff options
Diffstat (limited to 'Lib/test/test_imaplib.py')
| -rw-r--r-- | Lib/test/test_imaplib.py | 554 |
1 files changed, 492 insertions, 62 deletions
diff --git a/Lib/test/test_imaplib.py b/Lib/test/test_imaplib.py index 838411e..6e4a90f 100644 --- a/Lib/test/test_imaplib.py +++ b/Lib/test/test_imaplib.py @@ -10,17 +10,20 @@ import os.path import socketserver import time import calendar +import inspect -from test.support import reap_threads, verbose, transient_internet, run_with_tz, run_with_locale +from test.support import (reap_threads, verbose, transient_internet, + run_with_tz, run_with_locale) import unittest +from unittest import mock from datetime import datetime, timezone, timedelta try: import ssl except ImportError: ssl = None -CERTFILE = None -CAFILE = None +CERTFILE = os.path.join(os.path.dirname(__file__) or os.curdir, "keycert3.pem") +CAFILE = os.path.join(os.path.dirname(__file__) or os.curdir, "pycacert.pem") class TestImaplib(unittest.TestCase): @@ -41,17 +44,15 @@ class TestImaplib(unittest.TestCase): def test_Internaldate2tuple_issue10941(self): self.assertNotEqual(imaplib.Internaldate2tuple( b'25 (INTERNALDATE "02-Apr-2000 02:30:00 +0000")'), - imaplib.Internaldate2tuple( - b'25 (INTERNALDATE "02-Apr-2000 03:30:00 +0000")')) - - + imaplib.Internaldate2tuple( + b'25 (INTERNALDATE "02-Apr-2000 03:30:00 +0000")')) def timevalues(self): return [2000000000, 2000000000.0, time.localtime(2000000000), (2033, 5, 18, 5, 33, 20, -1, -1, -1), (2033, 5, 18, 5, 33, 20, -1, -1, 1), datetime.fromtimestamp(2000000000, - timezone(timedelta(0, 2*60*60))), + timezone(timedelta(0, 2 * 60 * 60))), '"18-May-2033 05:33:20 +0200"'] @run_with_locale('LC_ALL', 'de_DE', 'fr_FR') @@ -74,7 +75,6 @@ class TestImaplib(unittest.TestCase): if ssl: - class SecureTCPServer(socketserver.TCPServer): def get_request(self): @@ -95,13 +95,17 @@ else: class SimpleIMAPHandler(socketserver.StreamRequestHandler): - timeout = 1 continuation = None capabilities = '' + def setup(self): + super().setup() + self.server.logged = None + def _send(self, message): - if verbose: print("SENT: %r" % message.strip()) + if verbose: + print("SENT: %r" % message.strip()) self.wfile.write(message) def _send_line(self, message): @@ -134,7 +138,8 @@ class SimpleIMAPHandler(socketserver.StreamRequestHandler): if line.endswith(b'\r\n'): break - if verbose: print('GOT: %r' % line.strip()) + if verbose: + print('GOT: %r' % line.strip()) if self.continuation: try: self.continuation.send(line) @@ -146,8 +151,8 @@ class SimpleIMAPHandler(socketserver.StreamRequestHandler): cmd = splitline[1] args = splitline[2:] - if hasattr(self, 'cmd_'+cmd): - continuation = getattr(self, 'cmd_'+cmd)(tag, args) + if hasattr(self, 'cmd_' + cmd): + continuation = getattr(self, 'cmd_' + cmd)(tag, args) if continuation: self.continuation = continuation next(continuation) @@ -155,16 +160,353 @@ class SimpleIMAPHandler(socketserver.StreamRequestHandler): self._send_tagged(tag, 'BAD', cmd + ' unknown') def cmd_CAPABILITY(self, tag, args): - caps = 'IMAP4rev1 ' + self.capabilities if self.capabilities else 'IMAP4rev1' + caps = ('IMAP4rev1 ' + self.capabilities + if self.capabilities + else 'IMAP4rev1') self._send_textline('* CAPABILITY ' + caps) self._send_tagged(tag, 'OK', 'CAPABILITY completed') def cmd_LOGOUT(self, tag, args): + self.server.logged = None self._send_textline('* BYE IMAP4ref1 Server logging out') self._send_tagged(tag, 'OK', 'LOGOUT completed') + def cmd_LOGIN(self, tag, args): + self.server.logged = args[0] + self._send_tagged(tag, 'OK', 'LOGIN completed') + + +class NewIMAPTestsMixin(): + client = None + + def _setup(self, imap_handler, connect=True): + """ + Sets up imap_handler for tests. imap_handler should inherit from either: + - SimpleIMAPHandler - for testing IMAP commands, + - socketserver.StreamRequestHandler - if raw access to stream is needed. + Returns (client, server). + """ + class TestTCPServer(self.server_class): + def handle_error(self, request, client_address): + """ + End request and raise the error if one occurs. + """ + self.close_request(request) + self.server_close() + raise + + self.addCleanup(self._cleanup) + self.server = self.server_class((support.HOST, 0), imap_handler) + self.thread = threading.Thread( + name=self._testMethodName+'-server', + target=self.server.serve_forever, + # Short poll interval to make the test finish quickly. + # Time between requests is short enough that we won't wake + # up spuriously too many times. + kwargs={'poll_interval': 0.01}) + self.thread.daemon = True # In case this function raises. + self.thread.start() + + if connect: + self.client = self.imap_class(*self.server.server_address) + + return self.client, self.server + + def _cleanup(self): + """ + Cleans up the test server. This method should not be called manually, + it is added to the cleanup queue in the _setup method already. + """ + # if logout was called already we'd raise an exception trying to + # shutdown the client once again + if self.client is not None and self.client.state != 'LOGOUT': + self.client.shutdown() + # cleanup the server + self.server.shutdown() + self.server.server_close() + self.thread.join(3.0) + + def test_EOF_without_complete_welcome_message(self): + # http://bugs.python.org/issue5949 + class EOFHandler(socketserver.StreamRequestHandler): + def handle(self): + self.wfile.write(b'* OK') + _, server = self._setup(EOFHandler, connect=False) + self.assertRaises(imaplib.IMAP4.abort, self.imap_class, + *server.server_address) -class BaseThreadedNetworkedTests(unittest.TestCase): + def test_line_termination(self): + class BadNewlineHandler(SimpleIMAPHandler): + def cmd_CAPABILITY(self, tag, args): + self._send(b'* CAPABILITY IMAP4rev1 AUTH\n') + self._send_tagged(tag, 'OK', 'CAPABILITY completed') + _, server = self._setup(BadNewlineHandler, connect=False) + self.assertRaises(imaplib.IMAP4.abort, self.imap_class, + *server.server_address) + + def test_enable_raises_error_if_not_AUTH(self): + class EnableHandler(SimpleIMAPHandler): + capabilities = 'AUTH ENABLE UTF8=ACCEPT' + client, _ = self._setup(EnableHandler) + self.assertFalse(client.utf8_enabled) + with self.assertRaisesRegex(imaplib.IMAP4.error, 'ENABLE.*NONAUTH'): + client.enable('foo') + self.assertFalse(client.utf8_enabled) + + def test_enable_raises_error_if_no_capability(self): + client, _ = self._setup(SimpleIMAPHandler) + with self.assertRaisesRegex(imaplib.IMAP4.error, + 'does not support ENABLE'): + client.enable('foo') + + def test_enable_UTF8_raises_error_if_not_supported(self): + client, _ = self._setup(SimpleIMAPHandler) + typ, data = client.login('user', 'pass') + self.assertEqual(typ, 'OK') + with self.assertRaisesRegex(imaplib.IMAP4.error, + 'does not support ENABLE'): + client.enable('UTF8=ACCEPT') + + def test_enable_UTF8_True_append(self): + class UTF8AppendServer(SimpleIMAPHandler): + capabilities = 'ENABLE UTF8=ACCEPT' + def cmd_ENABLE(self, tag, args): + self._send_tagged(tag, 'OK', 'ENABLE successful') + def cmd_AUTHENTICATE(self, tag, args): + self._send_textline('+') + self.server.response = yield + self._send_tagged(tag, 'OK', 'FAKEAUTH successful') + def cmd_APPEND(self, tag, args): + self._send_textline('+') + self.server.response = yield + self._send_tagged(tag, 'OK', 'okay') + client, server = self._setup(UTF8AppendServer) + self.assertEqual(client._encoding, 'ascii') + code, _ = client.authenticate('MYAUTH', lambda x: b'fake') + self.assertEqual(code, 'OK') + self.assertEqual(server.response, b'ZmFrZQ==\r\n') # b64 encoded 'fake' + code, _ = client.enable('UTF8=ACCEPT') + self.assertEqual(code, 'OK') + self.assertEqual(client._encoding, 'utf-8') + msg_string = 'Subject: üñí©öðé' + typ, data = client.append(None, None, None, msg_string.encode('utf-8')) + self.assertEqual(typ, 'OK') + self.assertEqual(server.response, + ('UTF8 (%s)\r\n' % msg_string).encode('utf-8')) + + def test_search_disallows_charset_in_utf8_mode(self): + class UTF8Server(SimpleIMAPHandler): + capabilities = 'AUTH ENABLE UTF8=ACCEPT' + def cmd_ENABLE(self, tag, args): + self._send_tagged(tag, 'OK', 'ENABLE successful') + def cmd_AUTHENTICATE(self, tag, args): + self._send_textline('+') + self.server.response = yield + self._send_tagged(tag, 'OK', 'FAKEAUTH successful') + client, _ = self._setup(UTF8Server) + typ, _ = client.authenticate('MYAUTH', lambda x: b'fake') + self.assertEqual(typ, 'OK') + typ, _ = client.enable('UTF8=ACCEPT') + self.assertEqual(typ, 'OK') + self.assertTrue(client.utf8_enabled) + with self.assertRaisesRegex(imaplib.IMAP4.error, 'charset.*UTF8'): + client.search('foo', 'bar') + + def test_bad_auth_name(self): + class MyServer(SimpleIMAPHandler): + def cmd_AUTHENTICATE(self, tag, args): + self._send_tagged(tag, 'NO', + 'unrecognized authentication type {}'.format(args[0])) + client, _ = self._setup(MyServer) + with self.assertRaisesRegex(imaplib.IMAP4.error, + 'unrecognized authentication type METHOD'): + client.authenticate('METHOD', lambda: 1) + + def test_invalid_authentication(self): + class MyServer(SimpleIMAPHandler): + def cmd_AUTHENTICATE(self, tag, args): + self._send_textline('+') + self.response = yield + self._send_tagged(tag, 'NO', '[AUTHENTICATIONFAILED] invalid') + client, _ = self._setup(MyServer) + with self.assertRaisesRegex(imaplib.IMAP4.error, + r'\[AUTHENTICATIONFAILED\] invalid'): + client.authenticate('MYAUTH', lambda x: b'fake') + + def test_valid_authentication_bytes(self): + class MyServer(SimpleIMAPHandler): + def cmd_AUTHENTICATE(self, tag, args): + self._send_textline('+') + self.server.response = yield + self._send_tagged(tag, 'OK', 'FAKEAUTH successful') + client, server = self._setup(MyServer) + code, _ = client.authenticate('MYAUTH', lambda x: b'fake') + self.assertEqual(code, 'OK') + self.assertEqual(server.response, b'ZmFrZQ==\r\n') # b64 encoded 'fake' + + def test_valid_authentication_plain_text(self): + class MyServer(SimpleIMAPHandler): + def cmd_AUTHENTICATE(self, tag, args): + self._send_textline('+') + self.server.response = yield + self._send_tagged(tag, 'OK', 'FAKEAUTH successful') + client, server = self._setup(MyServer) + code, _ = client.authenticate('MYAUTH', lambda x: 'fake') + self.assertEqual(code, 'OK') + self.assertEqual(server.response, b'ZmFrZQ==\r\n') # b64 encoded 'fake' + + def test_login_cram_md5_bytes(self): + class AuthHandler(SimpleIMAPHandler): + capabilities = 'LOGINDISABLED AUTH=CRAM-MD5' + def cmd_AUTHENTICATE(self, tag, args): + self._send_textline('+ PDE4OTYuNjk3MTcwOTUyQHBvc3RvZmZpY2Uucm' + 'VzdG9uLm1jaS5uZXQ=') + r = yield + if (r == b'dGltIGYxY2E2YmU0NjRiOWVmYT' + b'FjY2E2ZmZkNmNmMmQ5ZjMy\r\n'): + self._send_tagged(tag, 'OK', 'CRAM-MD5 successful') + else: + self._send_tagged(tag, 'NO', 'No access') + client, _ = self._setup(AuthHandler) + self.assertTrue('AUTH=CRAM-MD5' in client.capabilities) + ret, _ = client.login_cram_md5("tim", b"tanstaaftanstaaf") + self.assertEqual(ret, "OK") + + def test_login_cram_md5_plain_text(self): + class AuthHandler(SimpleIMAPHandler): + capabilities = 'LOGINDISABLED AUTH=CRAM-MD5' + def cmd_AUTHENTICATE(self, tag, args): + self._send_textline('+ PDE4OTYuNjk3MTcwOTUyQHBvc3RvZmZpY2Uucm' + 'VzdG9uLm1jaS5uZXQ=') + r = yield + if (r == b'dGltIGYxY2E2YmU0NjRiOWVmYT' + b'FjY2E2ZmZkNmNmMmQ5ZjMy\r\n'): + self._send_tagged(tag, 'OK', 'CRAM-MD5 successful') + else: + self._send_tagged(tag, 'NO', 'No access') + client, _ = self._setup(AuthHandler) + self.assertTrue('AUTH=CRAM-MD5' in client.capabilities) + ret, _ = client.login_cram_md5("tim", "tanstaaftanstaaf") + self.assertEqual(ret, "OK") + + def test_aborted_authentication(self): + class MyServer(SimpleIMAPHandler): + def cmd_AUTHENTICATE(self, tag, args): + self._send_textline('+') + self.response = yield + if self.response == b'*\r\n': + self._send_tagged( + tag, + 'NO', + '[AUTHENTICATIONFAILED] aborted') + else: + self._send_tagged(tag, 'OK', 'MYAUTH successful') + client, _ = self._setup(MyServer) + with self.assertRaisesRegex(imaplib.IMAP4.error, + r'\[AUTHENTICATIONFAILED\] aborted'): + client.authenticate('MYAUTH', lambda x: None) + + @mock.patch('imaplib._MAXLINE', 10) + def test_linetoolong(self): + class TooLongHandler(SimpleIMAPHandler): + def handle(self): + # send response line longer than the limit set in the next line + self.wfile.write(b'* OK ' + 11 * b'x' + b'\r\n') + _, server = self._setup(TooLongHandler, connect=False) + with self.assertRaisesRegex(imaplib.IMAP4.error, + 'got more than 10 bytes'): + self.imap_class(*server.server_address) + + def test_simple_with_statement(self): + _, server = self._setup(SimpleIMAPHandler, connect=False) + with self.imap_class(*server.server_address): + pass + + def test_with_statement(self): + _, server = self._setup(SimpleIMAPHandler, connect=False) + with self.imap_class(*server.server_address) as imap: + imap.login('user', 'pass') + self.assertEqual(server.logged, 'user') + self.assertIsNone(server.logged) + + def test_with_statement_logout(self): + # It is legal to log out explicitly inside the with block + _, server = self._setup(SimpleIMAPHandler, connect=False) + with self.imap_class(*server.server_address) as imap: + imap.login('user', 'pass') + self.assertEqual(server.logged, 'user') + imap.logout() + self.assertIsNone(server.logged) + self.assertIsNone(server.logged) + + # command tests + + def test_login(self): + client, _ = self._setup(SimpleIMAPHandler) + typ, data = client.login('user', 'pass') + self.assertEqual(typ, 'OK') + self.assertEqual(data[0], b'LOGIN completed') + self.assertEqual(client.state, 'AUTH') + + def test_logout(self): + client, _ = self._setup(SimpleIMAPHandler) + typ, data = client.login('user', 'pass') + self.assertEqual(typ, 'OK') + self.assertEqual(data[0], b'LOGIN completed') + typ, data = client.logout() + self.assertEqual(typ, 'BYE') + self.assertEqual(data[0], b'IMAP4ref1 Server logging out') + self.assertEqual(client.state, 'LOGOUT') + + def test_lsub(self): + class LsubCmd(SimpleIMAPHandler): + def cmd_LSUB(self, tag, args): + self._send_textline('* LSUB () "." directoryA') + return self._send_tagged(tag, 'OK', 'LSUB completed') + client, _ = self._setup(LsubCmd) + client.login('user', 'pass') + typ, data = client.lsub() + self.assertEqual(typ, 'OK') + self.assertEqual(data[0], b'() "." directoryA') + + +class NewIMAPTests(NewIMAPTestsMixin, unittest.TestCase): + imap_class = imaplib.IMAP4 + server_class = socketserver.TCPServer + + +@unittest.skipUnless(ssl, "SSL not available") +class NewIMAPSSLTests(NewIMAPTestsMixin, unittest.TestCase): + imap_class = imaplib.IMAP4_SSL + server_class = SecureTCPServer + + def test_ssl_raises(self): + ssl_context = ssl.SSLContext(ssl.PROTOCOL_SSLv23) + ssl_context.verify_mode = ssl.CERT_REQUIRED + ssl_context.check_hostname = True + ssl_context.load_verify_locations(CAFILE) + + with self.assertRaisesRegex(ssl.CertificateError, + "hostname '127.0.0.1' doesn't match 'localhost'"): + _, server = self._setup(SimpleIMAPHandler) + client = self.imap_class(*server.server_address, + ssl_context=ssl_context) + client.shutdown() + + def test_ssl_verified(self): + ssl_context = ssl.SSLContext(ssl.PROTOCOL_SSLv23) + ssl_context.verify_mode = ssl.CERT_REQUIRED + ssl_context.check_hostname = True + ssl_context.load_verify_locations(CAFILE) + + _, server = self._setup(SimpleIMAPHandler) + client = self.imap_class("localhost", server.server_address[1], + ssl_context=ssl_context) + client.shutdown() + +class ThreadedNetworkedTests(unittest.TestCase): + server_class = socketserver.TCPServer + imap_class = imaplib.IMAP4 def make_server(self, addr, hdlr): @@ -174,7 +516,8 @@ class BaseThreadedNetworkedTests(unittest.TestCase): self.server_close() raise - if verbose: print("creating server") + if verbose: + print("creating server") server = MyServer(addr, hdlr) self.assertEqual(server.server_address, server.socket.getsockname()) @@ -190,18 +533,21 @@ class BaseThreadedNetworkedTests(unittest.TestCase): # Short poll interval to make the test finish quickly. # Time between requests is short enough that we won't wake # up spuriously too many times. - kwargs={'poll_interval':0.01}) + kwargs={'poll_interval': 0.01}) t.daemon = True # In case this function raises. t.start() - if verbose: print("server running") + if verbose: + print("server running") return server, t def reap_server(self, server, thread): - if verbose: print("waiting for server") + if verbose: + print("waiting for server") server.shutdown() server.server_close() thread.join() - if verbose: print("done") + if verbose: + print("done") @contextmanager def reaped_server(self, hdlr): @@ -251,6 +597,84 @@ class BaseThreadedNetworkedTests(unittest.TestCase): self.assertRaises(imaplib.IMAP4.abort, self.imap_class, *server.server_address) + class UTF8Server(SimpleIMAPHandler): + capabilities = 'AUTH ENABLE UTF8=ACCEPT' + + def cmd_ENABLE(self, tag, args): + self._send_tagged(tag, 'OK', 'ENABLE successful') + + def cmd_AUTHENTICATE(self, tag, args): + self._send_textline('+') + self.server.response = yield + self._send_tagged(tag, 'OK', 'FAKEAUTH successful') + + @reap_threads + def test_enable_raises_error_if_not_AUTH(self): + with self.reaped_pair(self.UTF8Server) as (server, client): + self.assertFalse(client.utf8_enabled) + self.assertRaises(imaplib.IMAP4.error, client.enable, 'foo') + self.assertFalse(client.utf8_enabled) + + # XXX Also need a test that enable after SELECT raises an error. + + @reap_threads + def test_enable_raises_error_if_no_capability(self): + class NoEnableServer(self.UTF8Server): + capabilities = 'AUTH' + with self.reaped_pair(NoEnableServer) as (server, client): + self.assertRaises(imaplib.IMAP4.error, client.enable, 'foo') + + @reap_threads + def test_enable_UTF8_raises_error_if_not_supported(self): + class NonUTF8Server(SimpleIMAPHandler): + pass + with self.assertRaises(imaplib.IMAP4.error): + with self.reaped_pair(NonUTF8Server) as (server, client): + typ, data = client.login('user', 'pass') + self.assertEqual(typ, 'OK') + client.enable('UTF8=ACCEPT') + pass + + @reap_threads + def test_enable_UTF8_True_append(self): + + class UTF8AppendServer(self.UTF8Server): + def cmd_APPEND(self, tag, args): + self._send_textline('+') + self.server.response = yield + self._send_tagged(tag, 'OK', 'okay') + + with self.reaped_pair(UTF8AppendServer) as (server, client): + self.assertEqual(client._encoding, 'ascii') + code, _ = client.authenticate('MYAUTH', lambda x: b'fake') + self.assertEqual(code, 'OK') + self.assertEqual(server.response, + b'ZmFrZQ==\r\n') # b64 encoded 'fake' + code, _ = client.enable('UTF8=ACCEPT') + self.assertEqual(code, 'OK') + self.assertEqual(client._encoding, 'utf-8') + msg_string = 'Subject: üñí©öðé' + typ, data = client.append( + None, None, None, msg_string.encode('utf-8')) + self.assertEqual(typ, 'OK') + self.assertEqual( + server.response, + ('UTF8 (%s)\r\n' % msg_string).encode('utf-8') + ) + + # XXX also need a test that makes sure that the Literal and Untagged_status + # regexes uses unicode in UTF8 mode instead of the default ASCII. + + @reap_threads + def test_search_disallows_charset_in_utf8_mode(self): + with self.reaped_pair(self.UTF8Server) as (server, client): + typ, _ = client.authenticate('MYAUTH', lambda x: b'fake') + self.assertEqual(typ, 'OK') + typ, _ = client.enable('UTF8=ACCEPT') + self.assertEqual(typ, 'OK') + self.assertTrue(client.utf8_enabled) + self.assertRaises(imaplib.IMAP4.error, client.search, 'foo', 'bar') + @reap_threads def test_bad_auth_name(self): @@ -258,7 +682,7 @@ class BaseThreadedNetworkedTests(unittest.TestCase): def cmd_AUTHENTICATE(self, tag, args): self._send_tagged(tag, 'NO', 'unrecognized authentication ' - 'type {}'.format(args[0])) + 'type {}'.format(args[0])) with self.reaped_pair(MyServer) as (server, client): with self.assertRaises(imaplib.IMAP4.error): @@ -292,13 +716,13 @@ class BaseThreadedNetworkedTests(unittest.TestCase): code, data = client.authenticate('MYAUTH', lambda x: b'fake') self.assertEqual(code, 'OK') self.assertEqual(server.response, - b'ZmFrZQ==\r\n') #b64 encoded 'fake' + b'ZmFrZQ==\r\n') # b64 encoded 'fake' with self.reaped_pair(MyServer) as (server, client): code, data = client.authenticate('MYAUTH', lambda x: 'fake') self.assertEqual(code, 'OK') self.assertEqual(server.response, - b'ZmFrZQ==\r\n') #b64 encoded 'fake' + b'ZmFrZQ==\r\n') # b64 encoded 'fake' @reap_threads def test_login_cram_md5(self): @@ -309,9 +733,10 @@ class BaseThreadedNetworkedTests(unittest.TestCase): def cmd_AUTHENTICATE(self, tag, args): self._send_textline('+ PDE4OTYuNjk3MTcwOTUyQHBvc3RvZmZpY2Uucm' - 'VzdG9uLm1jaS5uZXQ=') + 'VzdG9uLm1jaS5uZXQ=') r = yield - if r == b'dGltIGYxY2E2YmU0NjRiOWVmYTFjY2E2ZmZkNmNmMmQ5ZjMy\r\n': + if (r == b'dGltIGYxY2E2YmU0NjRiOWVmYT' + b'FjY2E2ZmZkNmNmMmQ5ZjMy\r\n'): self._send_tagged(tag, 'OK', 'CRAM-MD5 successful') else: self._send_tagged(tag, 'NO', 'No access') @@ -327,7 +752,6 @@ class BaseThreadedNetworkedTests(unittest.TestCase): self.assertEqual(ret, "OK") - @reap_threads def test_aborted_authentication(self): @@ -346,26 +770,46 @@ class BaseThreadedNetworkedTests(unittest.TestCase): with self.assertRaises(imaplib.IMAP4.error): code, data = client.authenticate('MYAUTH', lambda x: None) + def test_linetoolong(self): class TooLongHandler(SimpleIMAPHandler): def handle(self): # Send a very long response line - self.wfile.write(b'* OK ' + imaplib._MAXLINE*b'x' + b'\r\n') + self.wfile.write(b'* OK ' + imaplib._MAXLINE * b'x' + b'\r\n') with self.reaped_server(TooLongHandler) as server: self.assertRaises(imaplib.IMAP4.error, self.imap_class, *server.server_address) + @reap_threads + def test_simple_with_statement(self): + # simplest call + with self.reaped_server(SimpleIMAPHandler) as server: + with self.imap_class(*server.server_address): + pass -class ThreadedNetworkedTests(BaseThreadedNetworkedTests): + @reap_threads + def test_with_statement(self): + with self.reaped_server(SimpleIMAPHandler) as server: + with self.imap_class(*server.server_address) as imap: + imap.login('user', 'pass') + self.assertEqual(server.logged, 'user') + self.assertIsNone(server.logged) - server_class = socketserver.TCPServer - imap_class = imaplib.IMAP4 + @reap_threads + def test_with_statement_logout(self): + # what happens if already logout in the block? + with self.reaped_server(SimpleIMAPHandler) as server: + with self.imap_class(*server.server_address) as imap: + imap.login('user', 'pass') + self.assertEqual(server.logged, 'user') + imap.logout() + self.assertIsNone(server.logged) + self.assertIsNone(server.logged) @unittest.skipUnless(ssl, "SSL not available") -class ThreadedNetworkedTestsSSL(BaseThreadedNetworkedTests): - +class ThreadedNetworkedTestsSSL(ThreadedNetworkedTests): server_class = SecureTCPServer imap_class = IMAP4_SSL @@ -376,8 +820,9 @@ class ThreadedNetworkedTestsSSL(BaseThreadedNetworkedTests): ssl_context.check_hostname = True ssl_context.load_verify_locations(CAFILE) - with self.assertRaisesRegex(ssl.CertificateError, - "hostname '127.0.0.1' doesn't match 'localhost'"): + with self.assertRaisesRegex( + ssl.CertificateError, + "hostname '127.0.0.1' doesn't match 'localhost'"): with self.reaped_server(SimpleIMAPHandler) as server: client = self.imap_class(*server.server_address, ssl_context=ssl_context) @@ -389,6 +834,8 @@ class ThreadedNetworkedTestsSSL(BaseThreadedNetworkedTests): client.shutdown() +@unittest.skipUnless( + support.is_resource_enabled('network'), 'network resource disabled') class RemoteIMAPTest(unittest.TestCase): host = 'cyrus.andrew.cmu.edu' port = 143 @@ -422,6 +869,8 @@ class RemoteIMAPTest(unittest.TestCase): @unittest.skipUnless(ssl, "SSL not available") +@unittest.skipUnless( + support.is_resource_enabled('network'), 'network resource disabled') class RemoteIMAP_STARTTLSTest(RemoteIMAPTest): def setUp(self): @@ -475,7 +924,8 @@ class RemoteIMAP_SSLTest(RemoteIMAPTest): def test_logincapa_with_client_ssl_context(self): with transient_internet(self.host): - _server = self.imap_class(self.host, self.port, ssl_context=self.create_ssl_context()) + _server = self.imap_class( + self.host, self.port, ssl_context=self.create_ssl_context()) self.check_logincapa(_server) def test_logout(self): @@ -486,35 +936,15 @@ class RemoteIMAP_SSLTest(RemoteIMAPTest): def test_ssl_context_certfile_exclusive(self): with transient_internet(self.host): - self.assertRaises(ValueError, self.imap_class, self.host, self.port, - certfile=CERTFILE, ssl_context=self.create_ssl_context()) + self.assertRaises( + ValueError, self.imap_class, self.host, self.port, + certfile=CERTFILE, ssl_context=self.create_ssl_context()) def test_ssl_context_keyfile_exclusive(self): with transient_internet(self.host): - self.assertRaises(ValueError, self.imap_class, self.host, self.port, - keyfile=CERTFILE, ssl_context=self.create_ssl_context()) - - -def load_tests(*args): - tests = [TestImaplib] - - if support.is_resource_enabled('network'): - if ssl: - global CERTFILE, CAFILE - CERTFILE = os.path.join(os.path.dirname(__file__) or os.curdir, - "keycert3.pem") - if not os.path.exists(CERTFILE): - raise support.TestFailed("Can't read certificate files!") - CAFILE = os.path.join(os.path.dirname(__file__) or os.curdir, - "pycacert.pem") - if not os.path.exists(CAFILE): - raise support.TestFailed("Can't read CA file!") - tests.extend([ - ThreadedNetworkedTests, ThreadedNetworkedTestsSSL, - RemoteIMAPTest, RemoteIMAP_SSLTest, RemoteIMAP_STARTTLSTest, - ]) - - return unittest.TestSuite([unittest.makeSuite(test) for test in tests]) + self.assertRaises( + ValueError, self.imap_class, self.host, self.port, + keyfile=CERTFILE, ssl_context=self.create_ssl_context()) if __name__ == "__main__": |
