diff options
Diffstat (limited to 'Lib/test/test_imaplib.py')
-rw-r--r-- | Lib/test/test_imaplib.py | 145 |
1 files changed, 83 insertions, 62 deletions
diff --git a/Lib/test/test_imaplib.py b/Lib/test/test_imaplib.py index b34e652..5485a2a 100644 --- a/Lib/test/test_imaplib.py +++ b/Lib/test/test_imaplib.py @@ -11,7 +11,8 @@ import socketserver import time import calendar -from test.support import reap_threads, verbose, transient_internet, run_with_tz, run_with_locale +from test.support import (reap_threads, verbose, transient_internet, + run_with_tz, run_with_locale) import unittest from datetime import datetime, timezone, timedelta try: @@ -19,8 +20,8 @@ try: except ImportError: ssl = None -CERTFILE = None -CAFILE = None +CERTFILE = os.path.join(os.path.dirname(__file__) or os.curdir, "keycert3.pem") +CAFILE = os.path.join(os.path.dirname(__file__) or os.curdir, "pycacert.pem") class TestImaplib(unittest.TestCase): @@ -41,17 +42,15 @@ class TestImaplib(unittest.TestCase): def test_Internaldate2tuple_issue10941(self): self.assertNotEqual(imaplib.Internaldate2tuple( b'25 (INTERNALDATE "02-Apr-2000 02:30:00 +0000")'), - imaplib.Internaldate2tuple( - b'25 (INTERNALDATE "02-Apr-2000 03:30:00 +0000")')) - - + imaplib.Internaldate2tuple( + b'25 (INTERNALDATE "02-Apr-2000 03:30:00 +0000")')) def timevalues(self): return [2000000000, 2000000000.0, time.localtime(2000000000), (2033, 5, 18, 5, 33, 20, -1, -1, -1), (2033, 5, 18, 5, 33, 20, -1, -1, 1), datetime.fromtimestamp(2000000000, - timezone(timedelta(0, 2*60*60))), + timezone(timedelta(0, 2 * 60 * 60))), '"18-May-2033 05:33:20 +0200"'] @run_with_locale('LC_ALL', 'de_DE', 'fr_FR') @@ -72,7 +71,6 @@ class TestImaplib(unittest.TestCase): if ssl: - class SecureTCPServer(socketserver.TCPServer): def get_request(self): @@ -93,13 +91,17 @@ else: class SimpleIMAPHandler(socketserver.StreamRequestHandler): - timeout = 1 continuation = None capabilities = '' + def setup(self): + super().setup() + self.server.logged = None + def _send(self, message): - if verbose: print("SENT: %r" % message.strip()) + if verbose: + print("SENT: %r" % message.strip()) self.wfile.write(message) def _send_line(self, message): @@ -132,7 +134,8 @@ class SimpleIMAPHandler(socketserver.StreamRequestHandler): if line.endswith(b'\r\n'): break - if verbose: print('GOT: %r' % line.strip()) + if verbose: + print('GOT: %r' % line.strip()) if self.continuation: try: self.continuation.send(line) @@ -144,8 +147,8 @@ class SimpleIMAPHandler(socketserver.StreamRequestHandler): cmd = splitline[1] args = splitline[2:] - if hasattr(self, 'cmd_'+cmd): - continuation = getattr(self, 'cmd_'+cmd)(tag, args) + if hasattr(self, 'cmd_' + cmd): + continuation = getattr(self, 'cmd_' + cmd)(tag, args) if continuation: self.continuation = continuation next(continuation) @@ -153,16 +156,25 @@ class SimpleIMAPHandler(socketserver.StreamRequestHandler): self._send_tagged(tag, 'BAD', cmd + ' unknown') def cmd_CAPABILITY(self, tag, args): - caps = 'IMAP4rev1 ' + self.capabilities if self.capabilities else 'IMAP4rev1' + caps = ('IMAP4rev1 ' + self.capabilities + if self.capabilities + else 'IMAP4rev1') self._send_textline('* CAPABILITY ' + caps) self._send_tagged(tag, 'OK', 'CAPABILITY completed') def cmd_LOGOUT(self, tag, args): + self.server.logged = None self._send_textline('* BYE IMAP4ref1 Server logging out') self._send_tagged(tag, 'OK', 'LOGOUT completed') + def cmd_LOGIN(self, tag, args): + self.server.logged = args[0] + self._send_tagged(tag, 'OK', 'LOGIN completed') -class BaseThreadedNetworkedTests(unittest.TestCase): + +class ThreadedNetworkedTests(unittest.TestCase): + server_class = socketserver.TCPServer + imap_class = imaplib.IMAP4 def make_server(self, addr, hdlr): @@ -172,7 +184,8 @@ class BaseThreadedNetworkedTests(unittest.TestCase): self.server_close() raise - if verbose: print("creating server") + if verbose: + print("creating server") server = MyServer(addr, hdlr) self.assertEqual(server.server_address, server.socket.getsockname()) @@ -188,18 +201,21 @@ class BaseThreadedNetworkedTests(unittest.TestCase): # Short poll interval to make the test finish quickly. # Time between requests is short enough that we won't wake # up spuriously too many times. - kwargs={'poll_interval':0.01}) + kwargs={'poll_interval': 0.01}) t.daemon = True # In case this function raises. t.start() - if verbose: print("server running") + if verbose: + print("server running") return server, t def reap_server(self, server, thread): - if verbose: print("waiting for server") + if verbose: + print("waiting for server") server.shutdown() server.server_close() thread.join() - if verbose: print("done") + if verbose: + print("done") @contextmanager def reaped_server(self, hdlr): @@ -256,7 +272,7 @@ class BaseThreadedNetworkedTests(unittest.TestCase): def cmd_AUTHENTICATE(self, tag, args): self._send_tagged(tag, 'NO', 'unrecognized authentication ' - 'type {}'.format(args[0])) + 'type {}'.format(args[0])) with self.reaped_pair(MyServer) as (server, client): with self.assertRaises(imaplib.IMAP4.error): @@ -290,13 +306,13 @@ class BaseThreadedNetworkedTests(unittest.TestCase): code, data = client.authenticate('MYAUTH', lambda x: b'fake') self.assertEqual(code, 'OK') self.assertEqual(server.response, - b'ZmFrZQ==\r\n') #b64 encoded 'fake' + b'ZmFrZQ==\r\n') # b64 encoded 'fake' with self.reaped_pair(MyServer) as (server, client): code, data = client.authenticate('MYAUTH', lambda x: 'fake') self.assertEqual(code, 'OK') self.assertEqual(server.response, - b'ZmFrZQ==\r\n') #b64 encoded 'fake' + b'ZmFrZQ==\r\n') # b64 encoded 'fake' @reap_threads def test_login_cram_md5(self): @@ -307,9 +323,10 @@ class BaseThreadedNetworkedTests(unittest.TestCase): def cmd_AUTHENTICATE(self, tag, args): self._send_textline('+ PDE4OTYuNjk3MTcwOTUyQHBvc3RvZmZpY2Uucm' - 'VzdG9uLm1jaS5uZXQ=') + 'VzdG9uLm1jaS5uZXQ=') r = yield - if r == b'dGltIGYxY2E2YmU0NjRiOWVmYTFjY2E2ZmZkNmNmMmQ5ZjMy\r\n': + if (r == b'dGltIGYxY2E2YmU0NjRiOWVmYT' + b'FjY2E2ZmZkNmNmMmQ5ZjMy\r\n'): self._send_tagged(tag, 'OK', 'CRAM-MD5 successful') else: self._send_tagged(tag, 'NO', 'No access') @@ -324,27 +341,45 @@ class BaseThreadedNetworkedTests(unittest.TestCase): ret, data = client.login_cram_md5("tim", b"tanstaaftanstaaf") self.assertEqual(ret, "OK") - def test_linetoolong(self): class TooLongHandler(SimpleIMAPHandler): def handle(self): # Send a very long response line - self.wfile.write(b'* OK ' + imaplib._MAXLINE*b'x' + b'\r\n') + self.wfile.write(b'* OK ' + imaplib._MAXLINE * b'x' + b'\r\n') with self.reaped_server(TooLongHandler) as server: self.assertRaises(imaplib.IMAP4.error, self.imap_class, *server.server_address) + @reap_threads + def test_simple_with_statement(self): + # simplest call + with self.reaped_server(SimpleIMAPHandler) as server: + with self.imap_class(*server.server_address): + pass -class ThreadedNetworkedTests(BaseThreadedNetworkedTests): + @reap_threads + def test_with_statement(self): + with self.reaped_server(SimpleIMAPHandler) as server: + with self.imap_class(*server.server_address) as imap: + imap.login('user', 'pass') + self.assertEqual(server.logged, 'user') + self.assertIsNone(server.logged) - server_class = socketserver.TCPServer - imap_class = imaplib.IMAP4 + @reap_threads + def test_with_statement_logout(self): + # what happens if already logout in the block? + with self.reaped_server(SimpleIMAPHandler) as server: + with self.imap_class(*server.server_address) as imap: + imap.login('user', 'pass') + self.assertEqual(server.logged, 'user') + imap.logout() + self.assertIsNone(server.logged) + self.assertIsNone(server.logged) @unittest.skipUnless(ssl, "SSL not available") -class ThreadedNetworkedTestsSSL(BaseThreadedNetworkedTests): - +class ThreadedNetworkedTestsSSL(ThreadedNetworkedTests): server_class = SecureTCPServer imap_class = IMAP4_SSL @@ -355,8 +390,9 @@ class ThreadedNetworkedTestsSSL(BaseThreadedNetworkedTests): ssl_context.check_hostname = True ssl_context.load_verify_locations(CAFILE) - with self.assertRaisesRegex(ssl.CertificateError, - "hostname '127.0.0.1' doesn't match 'localhost'"): + with self.assertRaisesRegex( + ssl.CertificateError, + "hostname '127.0.0.1' doesn't match 'localhost'"): with self.reaped_server(SimpleIMAPHandler) as server: client = self.imap_class(*server.server_address, ssl_context=ssl_context) @@ -368,6 +404,8 @@ class ThreadedNetworkedTestsSSL(BaseThreadedNetworkedTests): client.shutdown() +@unittest.skipUnless( + support.is_resource_enabled('network'), 'network resource disabled') class RemoteIMAPTest(unittest.TestCase): host = 'cyrus.andrew.cmu.edu' port = 143 @@ -401,6 +439,8 @@ class RemoteIMAPTest(unittest.TestCase): @unittest.skipUnless(ssl, "SSL not available") +@unittest.skipUnless( + support.is_resource_enabled('network'), 'network resource disabled') class RemoteIMAP_STARTTLSTest(RemoteIMAPTest): def setUp(self): @@ -454,7 +494,8 @@ class RemoteIMAP_SSLTest(RemoteIMAPTest): def test_logincapa_with_client_ssl_context(self): with transient_internet(self.host): - _server = self.imap_class(self.host, self.port, ssl_context=self.create_ssl_context()) + _server = self.imap_class( + self.host, self.port, ssl_context=self.create_ssl_context()) self.check_logincapa(_server) def test_logout(self): @@ -465,35 +506,15 @@ class RemoteIMAP_SSLTest(RemoteIMAPTest): def test_ssl_context_certfile_exclusive(self): with transient_internet(self.host): - self.assertRaises(ValueError, self.imap_class, self.host, self.port, - certfile=CERTFILE, ssl_context=self.create_ssl_context()) + self.assertRaises( + ValueError, self.imap_class, self.host, self.port, + certfile=CERTFILE, ssl_context=self.create_ssl_context()) def test_ssl_context_keyfile_exclusive(self): with transient_internet(self.host): - self.assertRaises(ValueError, self.imap_class, self.host, self.port, - keyfile=CERTFILE, ssl_context=self.create_ssl_context()) - - -def load_tests(*args): - tests = [TestImaplib] - - if support.is_resource_enabled('network'): - if ssl: - global CERTFILE, CAFILE - CERTFILE = os.path.join(os.path.dirname(__file__) or os.curdir, - "keycert3.pem") - if not os.path.exists(CERTFILE): - raise support.TestFailed("Can't read certificate files!") - CAFILE = os.path.join(os.path.dirname(__file__) or os.curdir, - "pycacert.pem") - if not os.path.exists(CAFILE): - raise support.TestFailed("Can't read CA file!") - tests.extend([ - ThreadedNetworkedTests, ThreadedNetworkedTestsSSL, - RemoteIMAPTest, RemoteIMAP_SSLTest, RemoteIMAP_STARTTLSTest, - ]) - - return unittest.TestSuite([unittest.makeSuite(test) for test in tests]) + self.assertRaises( + ValueError, self.imap_class, self.host, self.port, + keyfile=CERTFILE, ssl_context=self.create_ssl_context()) if __name__ == "__main__": |