diff options
Diffstat (limited to 'Lib/test/test_imaplib.py')
-rw-r--r-- | Lib/test/test_imaplib.py | 224 |
1 files changed, 162 insertions, 62 deletions
diff --git a/Lib/test/test_imaplib.py b/Lib/test/test_imaplib.py index 96b4f32..8248656 100644 --- a/Lib/test/test_imaplib.py +++ b/Lib/test/test_imaplib.py @@ -11,7 +11,8 @@ import socketserver import time import calendar -from test.support import reap_threads, verbose, transient_internet, run_with_tz, run_with_locale +from test.support import (reap_threads, verbose, transient_internet, + run_with_tz, run_with_locale) import unittest from datetime import datetime, timezone, timedelta try: @@ -19,8 +20,8 @@ try: except ImportError: ssl = None -CERTFILE = None -CAFILE = None +CERTFILE = os.path.join(os.path.dirname(__file__) or os.curdir, "keycert3.pem") +CAFILE = os.path.join(os.path.dirname(__file__) or os.curdir, "pycacert.pem") class TestImaplib(unittest.TestCase): @@ -41,17 +42,15 @@ class TestImaplib(unittest.TestCase): def test_Internaldate2tuple_issue10941(self): self.assertNotEqual(imaplib.Internaldate2tuple( b'25 (INTERNALDATE "02-Apr-2000 02:30:00 +0000")'), - imaplib.Internaldate2tuple( - b'25 (INTERNALDATE "02-Apr-2000 03:30:00 +0000")')) - - + imaplib.Internaldate2tuple( + b'25 (INTERNALDATE "02-Apr-2000 03:30:00 +0000")')) def timevalues(self): return [2000000000, 2000000000.0, time.localtime(2000000000), (2033, 5, 18, 5, 33, 20, -1, -1, -1), (2033, 5, 18, 5, 33, 20, -1, -1, 1), datetime.fromtimestamp(2000000000, - timezone(timedelta(0, 2*60*60))), + timezone(timedelta(0, 2 * 60 * 60))), '"18-May-2033 05:33:20 +0200"'] @run_with_locale('LC_ALL', 'de_DE', 'fr_FR') @@ -72,7 +71,6 @@ class TestImaplib(unittest.TestCase): if ssl: - class SecureTCPServer(socketserver.TCPServer): def get_request(self): @@ -93,13 +91,17 @@ else: class SimpleIMAPHandler(socketserver.StreamRequestHandler): - timeout = 1 continuation = None capabilities = '' + def setup(self): + super().setup() + self.server.logged = None + def _send(self, message): - if verbose: print("SENT: %r" % message.strip()) + if verbose: + print("SENT: %r" % message.strip()) self.wfile.write(message) def _send_line(self, message): @@ -132,7 +134,8 @@ class SimpleIMAPHandler(socketserver.StreamRequestHandler): if line.endswith(b'\r\n'): break - if verbose: print('GOT: %r' % line.strip()) + if verbose: + print('GOT: %r' % line.strip()) if self.continuation: try: self.continuation.send(line) @@ -144,8 +147,8 @@ class SimpleIMAPHandler(socketserver.StreamRequestHandler): cmd = splitline[1] args = splitline[2:] - if hasattr(self, 'cmd_'+cmd): - continuation = getattr(self, 'cmd_'+cmd)(tag, args) + if hasattr(self, 'cmd_' + cmd): + continuation = getattr(self, 'cmd_' + cmd)(tag, args) if continuation: self.continuation = continuation next(continuation) @@ -153,16 +156,25 @@ class SimpleIMAPHandler(socketserver.StreamRequestHandler): self._send_tagged(tag, 'BAD', cmd + ' unknown') def cmd_CAPABILITY(self, tag, args): - caps = 'IMAP4rev1 ' + self.capabilities if self.capabilities else 'IMAP4rev1' + caps = ('IMAP4rev1 ' + self.capabilities + if self.capabilities + else 'IMAP4rev1') self._send_textline('* CAPABILITY ' + caps) self._send_tagged(tag, 'OK', 'CAPABILITY completed') def cmd_LOGOUT(self, tag, args): + self.server.logged = None self._send_textline('* BYE IMAP4ref1 Server logging out') self._send_tagged(tag, 'OK', 'LOGOUT completed') + def cmd_LOGIN(self, tag, args): + self.server.logged = args[0] + self._send_tagged(tag, 'OK', 'LOGIN completed') -class BaseThreadedNetworkedTests(unittest.TestCase): + +class ThreadedNetworkedTests(unittest.TestCase): + server_class = socketserver.TCPServer + imap_class = imaplib.IMAP4 def make_server(self, addr, hdlr): @@ -172,7 +184,8 @@ class BaseThreadedNetworkedTests(unittest.TestCase): self.server_close() raise - if verbose: print("creating server") + if verbose: + print("creating server") server = MyServer(addr, hdlr) self.assertEqual(server.server_address, server.socket.getsockname()) @@ -188,18 +201,21 @@ class BaseThreadedNetworkedTests(unittest.TestCase): # Short poll interval to make the test finish quickly. # Time between requests is short enough that we won't wake # up spuriously too many times. - kwargs={'poll_interval':0.01}) + kwargs={'poll_interval': 0.01}) t.daemon = True # In case this function raises. t.start() - if verbose: print("server running") + if verbose: + print("server running") return server, t def reap_server(self, server, thread): - if verbose: print("waiting for server") + if verbose: + print("waiting for server") server.shutdown() server.server_close() thread.join() - if verbose: print("done") + if verbose: + print("done") @contextmanager def reaped_server(self, hdlr): @@ -249,6 +265,84 @@ class BaseThreadedNetworkedTests(unittest.TestCase): self.assertRaises(imaplib.IMAP4.abort, self.imap_class, *server.server_address) + class UTF8Server(SimpleIMAPHandler): + capabilities = 'AUTH ENABLE UTF8=ACCEPT' + + def cmd_ENABLE(self, tag, args): + self._send_tagged(tag, 'OK', 'ENABLE successful') + + def cmd_AUTHENTICATE(self, tag, args): + self._send_textline('+') + self.server.response = yield + self._send_tagged(tag, 'OK', 'FAKEAUTH successful') + + @reap_threads + def test_enable_raises_error_if_not_AUTH(self): + with self.reaped_pair(self.UTF8Server) as (server, client): + self.assertFalse(client.utf8_enabled) + self.assertRaises(imaplib.IMAP4.error, client.enable, 'foo') + self.assertFalse(client.utf8_enabled) + + # XXX Also need a test that enable after SELECT raises an error. + + @reap_threads + def test_enable_raises_error_if_no_capability(self): + class NoEnableServer(self.UTF8Server): + capabilities = 'AUTH' + with self.reaped_pair(NoEnableServer) as (server, client): + self.assertRaises(imaplib.IMAP4.error, client.enable, 'foo') + + @reap_threads + def test_enable_UTF8_raises_error_if_not_supported(self): + class NonUTF8Server(SimpleIMAPHandler): + pass + with self.assertRaises(imaplib.IMAP4.error): + with self.reaped_pair(NonUTF8Server) as (server, client): + typ, data = client.login('user', 'pass') + self.assertEqual(typ, 'OK') + client.enable('UTF8=ACCEPT') + pass + + @reap_threads + def test_enable_UTF8_True_append(self): + + class UTF8AppendServer(self.UTF8Server): + def cmd_APPEND(self, tag, args): + self._send_textline('+') + self.server.response = yield + self._send_tagged(tag, 'OK', 'okay') + + with self.reaped_pair(UTF8AppendServer) as (server, client): + self.assertEqual(client._encoding, 'ascii') + code, _ = client.authenticate('MYAUTH', lambda x: b'fake') + self.assertEqual(code, 'OK') + self.assertEqual(server.response, + b'ZmFrZQ==\r\n') # b64 encoded 'fake' + code, _ = client.enable('UTF8=ACCEPT') + self.assertEqual(code, 'OK') + self.assertEqual(client._encoding, 'utf-8') + msg_string = 'Subject: üñí©öðé' + typ, data = client.append( + None, None, None, msg_string.encode('utf-8')) + self.assertEqual(typ, 'OK') + self.assertEqual( + server.response, + ('UTF8 (%s)\r\n' % msg_string).encode('utf-8') + ) + + # XXX also need a test that makes sure that the Literal and Untagged_status + # regexes uses unicode in UTF8 mode instead of the default ASCII. + + @reap_threads + def test_search_disallows_charset_in_utf8_mode(self): + with self.reaped_pair(self.UTF8Server) as (server, client): + typ, _ = client.authenticate('MYAUTH', lambda x: b'fake') + self.assertEqual(typ, 'OK') + typ, _ = client.enable('UTF8=ACCEPT') + self.assertEqual(typ, 'OK') + self.assertTrue(client.utf8_enabled) + self.assertRaises(imaplib.IMAP4.error, client.search, 'foo', 'bar') + @reap_threads def test_bad_auth_name(self): @@ -256,7 +350,7 @@ class BaseThreadedNetworkedTests(unittest.TestCase): def cmd_AUTHENTICATE(self, tag, args): self._send_tagged(tag, 'NO', 'unrecognized authentication ' - 'type {}'.format(args[0])) + 'type {}'.format(args[0])) with self.reaped_pair(MyServer) as (server, client): with self.assertRaises(imaplib.IMAP4.error): @@ -290,13 +384,13 @@ class BaseThreadedNetworkedTests(unittest.TestCase): code, data = client.authenticate('MYAUTH', lambda x: b'fake') self.assertEqual(code, 'OK') self.assertEqual(server.response, - b'ZmFrZQ==\r\n') #b64 encoded 'fake' + b'ZmFrZQ==\r\n') # b64 encoded 'fake' with self.reaped_pair(MyServer) as (server, client): code, data = client.authenticate('MYAUTH', lambda x: 'fake') self.assertEqual(code, 'OK') self.assertEqual(server.response, - b'ZmFrZQ==\r\n') #b64 encoded 'fake' + b'ZmFrZQ==\r\n') # b64 encoded 'fake' @reap_threads def test_login_cram_md5(self): @@ -307,9 +401,10 @@ class BaseThreadedNetworkedTests(unittest.TestCase): def cmd_AUTHENTICATE(self, tag, args): self._send_textline('+ PDE4OTYuNjk3MTcwOTUyQHBvc3RvZmZpY2Uucm' - 'VzdG9uLm1jaS5uZXQ=') + 'VzdG9uLm1jaS5uZXQ=') r = yield - if r == b'dGltIGYxY2E2YmU0NjRiOWVmYTFjY2E2ZmZkNmNmMmQ5ZjMy\r\n': + if (r == b'dGltIGYxY2E2YmU0NjRiOWVmYT' + b'FjY2E2ZmZkNmNmMmQ5ZjMy\r\n'): self._send_tagged(tag, 'OK', 'CRAM-MD5 successful') else: self._send_tagged(tag, 'NO', 'No access') @@ -325,7 +420,6 @@ class BaseThreadedNetworkedTests(unittest.TestCase): self.assertEqual(ret, "OK") - @reap_threads def test_aborted_authentication(self): @@ -344,26 +438,46 @@ class BaseThreadedNetworkedTests(unittest.TestCase): with self.assertRaises(imaplib.IMAP4.error): code, data = client.authenticate('MYAUTH', lambda x: None) + def test_linetoolong(self): class TooLongHandler(SimpleIMAPHandler): def handle(self): # Send a very long response line - self.wfile.write(b'* OK ' + imaplib._MAXLINE*b'x' + b'\r\n') + self.wfile.write(b'* OK ' + imaplib._MAXLINE * b'x' + b'\r\n') with self.reaped_server(TooLongHandler) as server: self.assertRaises(imaplib.IMAP4.error, self.imap_class, *server.server_address) + @reap_threads + def test_simple_with_statement(self): + # simplest call + with self.reaped_server(SimpleIMAPHandler) as server: + with self.imap_class(*server.server_address): + pass -class ThreadedNetworkedTests(BaseThreadedNetworkedTests): + @reap_threads + def test_with_statement(self): + with self.reaped_server(SimpleIMAPHandler) as server: + with self.imap_class(*server.server_address) as imap: + imap.login('user', 'pass') + self.assertEqual(server.logged, 'user') + self.assertIsNone(server.logged) - server_class = socketserver.TCPServer - imap_class = imaplib.IMAP4 + @reap_threads + def test_with_statement_logout(self): + # what happens if already logout in the block? + with self.reaped_server(SimpleIMAPHandler) as server: + with self.imap_class(*server.server_address) as imap: + imap.login('user', 'pass') + self.assertEqual(server.logged, 'user') + imap.logout() + self.assertIsNone(server.logged) + self.assertIsNone(server.logged) @unittest.skipUnless(ssl, "SSL not available") -class ThreadedNetworkedTestsSSL(BaseThreadedNetworkedTests): - +class ThreadedNetworkedTestsSSL(ThreadedNetworkedTests): server_class = SecureTCPServer imap_class = IMAP4_SSL @@ -374,8 +488,9 @@ class ThreadedNetworkedTestsSSL(BaseThreadedNetworkedTests): ssl_context.check_hostname = True ssl_context.load_verify_locations(CAFILE) - with self.assertRaisesRegex(ssl.CertificateError, - "hostname '127.0.0.1' doesn't match 'localhost'"): + with self.assertRaisesRegex( + ssl.CertificateError, + "hostname '127.0.0.1' doesn't match 'localhost'"): with self.reaped_server(SimpleIMAPHandler) as server: client = self.imap_class(*server.server_address, ssl_context=ssl_context) @@ -387,6 +502,8 @@ class ThreadedNetworkedTestsSSL(BaseThreadedNetworkedTests): client.shutdown() +@unittest.skipUnless( + support.is_resource_enabled('network'), 'network resource disabled') class RemoteIMAPTest(unittest.TestCase): host = 'cyrus.andrew.cmu.edu' port = 143 @@ -420,6 +537,8 @@ class RemoteIMAPTest(unittest.TestCase): @unittest.skipUnless(ssl, "SSL not available") +@unittest.skipUnless( + support.is_resource_enabled('network'), 'network resource disabled') class RemoteIMAP_STARTTLSTest(RemoteIMAPTest): def setUp(self): @@ -473,7 +592,8 @@ class RemoteIMAP_SSLTest(RemoteIMAPTest): def test_logincapa_with_client_ssl_context(self): with transient_internet(self.host): - _server = self.imap_class(self.host, self.port, ssl_context=self.create_ssl_context()) + _server = self.imap_class( + self.host, self.port, ssl_context=self.create_ssl_context()) self.check_logincapa(_server) def test_logout(self): @@ -484,35 +604,15 @@ class RemoteIMAP_SSLTest(RemoteIMAPTest): def test_ssl_context_certfile_exclusive(self): with transient_internet(self.host): - self.assertRaises(ValueError, self.imap_class, self.host, self.port, - certfile=CERTFILE, ssl_context=self.create_ssl_context()) + self.assertRaises( + ValueError, self.imap_class, self.host, self.port, + certfile=CERTFILE, ssl_context=self.create_ssl_context()) def test_ssl_context_keyfile_exclusive(self): with transient_internet(self.host): - self.assertRaises(ValueError, self.imap_class, self.host, self.port, - keyfile=CERTFILE, ssl_context=self.create_ssl_context()) - - -def load_tests(*args): - tests = [TestImaplib] - - if support.is_resource_enabled('network'): - if ssl: - global CERTFILE, CAFILE - CERTFILE = os.path.join(os.path.dirname(__file__) or os.curdir, - "keycert3.pem") - if not os.path.exists(CERTFILE): - raise support.TestFailed("Can't read certificate files!") - CAFILE = os.path.join(os.path.dirname(__file__) or os.curdir, - "pycacert.pem") - if not os.path.exists(CAFILE): - raise support.TestFailed("Can't read CA file!") - tests.extend([ - ThreadedNetworkedTests, ThreadedNetworkedTestsSSL, - RemoteIMAPTest, RemoteIMAP_SSLTest, RemoteIMAP_STARTTLSTest, - ]) - - return unittest.TestSuite([unittest.makeSuite(test) for test in tests]) + self.assertRaises( + ValueError, self.imap_class, self.host, self.port, + keyfile=CERTFILE, ssl_context=self.create_ssl_context()) if __name__ == "__main__": |