summaryrefslogtreecommitdiffstats
path: root/Lib/test/test_imaplib.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/test_imaplib.py')
-rw-r--r--Lib/test/test_imaplib.py127
1 files changed, 118 insertions, 9 deletions
diff --git a/Lib/test/test_imaplib.py b/Lib/test/test_imaplib.py
index 0e15fa3..30ae6a4 100644
--- a/Lib/test/test_imaplib.py
+++ b/Lib/test/test_imaplib.py
@@ -94,14 +94,25 @@ else:
class SimpleIMAPHandler(socketserver.StreamRequestHandler):
timeout = 1
+ continuation = None
+ capabilities = ''
def _send(self, message):
if verbose: print("SENT: %r" % message.strip())
self.wfile.write(message)
+ def _send_line(self, message):
+ self._send(message + b'\r\n')
+
+ def _send_textline(self, message):
+ self._send_line(message.encode('ASCII'))
+
+ def _send_tagged(self, tag, code, message):
+ self._send_textline(' '.join((tag, code, message)))
+
def handle(self):
# Send a welcome message.
- self._send(b'* OK IMAP4rev1\r\n')
+ self._send_textline('* OK IMAP4rev1')
while 1:
# Gather up input until we receive a line terminator or we timeout.
# Accumulate read(1) because it's simpler to handle the differences
@@ -121,19 +132,33 @@ class SimpleIMAPHandler(socketserver.StreamRequestHandler):
break
if verbose: print('GOT: %r' % line.strip())
- splitline = line.split()
- tag = splitline[0].decode('ASCII')
- cmd = splitline[1].decode('ASCII')
+ if self.continuation:
+ try:
+ self.continuation.send(line)
+ except StopIteration:
+ self.continuation = None
+ continue
+ splitline = line.decode('ASCII').split()
+ tag = splitline[0]
+ cmd = splitline[1]
args = splitline[2:]
if hasattr(self, 'cmd_'+cmd):
- getattr(self, 'cmd_'+cmd)(tag, args)
+ continuation = getattr(self, 'cmd_'+cmd)(tag, args)
+ if continuation:
+ self.continuation = continuation
+ next(continuation)
else:
- self._send('{} BAD {} unknown\r\n'.format(tag, cmd).encode('ASCII'))
+ self._send_tagged(tag, 'BAD', cmd + ' unknown')
def cmd_CAPABILITY(self, tag, args):
- self._send(b'* CAPABILITY IMAP4rev1\r\n')
- self._send('{} OK CAPABILITY completed\r\n'.format(tag).encode('ASCII'))
+ caps = 'IMAP4rev1 ' + self.capabilities if self.capabilities else 'IMAP4rev1'
+ self._send_textline('* CAPABILITY ' + caps)
+ self._send_tagged(tag, 'OK', 'CAPABILITY completed')
+
+ def cmd_LOGOUT(self, tag, args):
+ self._send_textline('* BYE IMAP4ref1 Server logging out')
+ self._send_tagged(tag, 'OK', 'LOGOUT completed')
class BaseThreadedNetworkedTests(unittest.TestCase):
@@ -183,6 +208,16 @@ class BaseThreadedNetworkedTests(unittest.TestCase):
finally:
self.reap_server(server, thread)
+ @contextmanager
+ def reaped_pair(self, hdlr):
+ server, thread = self.make_server((support.HOST, 0), hdlr)
+ client = self.imap_class(*server.server_address)
+ try:
+ yield server, client
+ finally:
+ client.logout()
+ self.reap_server(server, thread)
+
@reap_threads
def test_connect(self):
with self.reaped_server(SimpleIMAPHandler) as server:
@@ -208,12 +243,86 @@ class BaseThreadedNetworkedTests(unittest.TestCase):
def cmd_CAPABILITY(self, tag, args):
self._send(b'* CAPABILITY IMAP4rev1 AUTH\n')
- self._send('{} OK CAPABILITY completed\r\n'.format(tag).encode('ASCII'))
+ self._send_tagged(tag, 'OK', 'CAPABILITY completed')
with self.reaped_server(BadNewlineHandler) as server:
self.assertRaises(imaplib.IMAP4.abort,
self.imap_class, *server.server_address)
+ @reap_threads
+ def test_bad_auth_name(self):
+
+ class MyServer(SimpleIMAPHandler):
+
+ def cmd_AUTHENTICATE(self, tag, args):
+ self._send_tagged(tag, 'NO', 'unrecognized authentication '
+ 'type {}'.format(args[0]))
+
+ with self.reaped_pair(MyServer) as (server, client):
+ with self.assertRaises(imaplib.IMAP4.error):
+ client.authenticate('METHOD', lambda: 1)
+
+ @reap_threads
+ def test_invalid_authentication(self):
+
+ class MyServer(SimpleIMAPHandler):
+
+ def cmd_AUTHENTICATE(self, tag, args):
+ self._send_textline('+')
+ self.response = yield
+ self._send_tagged(tag, 'NO', '[AUTHENTICATIONFAILED] invalid')
+
+ with self.reaped_pair(MyServer) as (server, client):
+ with self.assertRaises(imaplib.IMAP4.error):
+ code, data = client.authenticate('MYAUTH', lambda x: b'fake')
+
+ @reap_threads
+ def test_valid_authentication(self):
+
+ class MyServer(SimpleIMAPHandler):
+
+ def cmd_AUTHENTICATE(self, tag, args):
+ self._send_textline('+')
+ self.server.response = yield
+ self._send_tagged(tag, 'OK', 'FAKEAUTH successful')
+
+ with self.reaped_pair(MyServer) as (server, client):
+ code, data = client.authenticate('MYAUTH', lambda x: b'fake')
+ self.assertEqual(code, 'OK')
+ self.assertEqual(server.response,
+ b'ZmFrZQ==\r\n') #b64 encoded 'fake'
+
+ with self.reaped_pair(MyServer) as (server, client):
+ code, data = client.authenticate('MYAUTH', lambda x: 'fake')
+ self.assertEqual(code, 'OK')
+ self.assertEqual(server.response,
+ b'ZmFrZQ==\r\n') #b64 encoded 'fake'
+
+ @reap_threads
+ def test_login_cram_md5(self):
+
+ class AuthHandler(SimpleIMAPHandler):
+
+ capabilities = 'LOGINDISABLED AUTH=CRAM-MD5'
+
+ def cmd_AUTHENTICATE(self, tag, args):
+ self._send_textline('+ PDE4OTYuNjk3MTcwOTUyQHBvc3RvZmZpY2Uucm'
+ 'VzdG9uLm1jaS5uZXQ=')
+ r = yield
+ if r == b'dGltIGYxY2E2YmU0NjRiOWVmYTFjY2E2ZmZkNmNmMmQ5ZjMy\r\n':
+ self._send_tagged(tag, 'OK', 'CRAM-MD5 successful')
+ else:
+ self._send_tagged(tag, 'NO', 'No access')
+
+ with self.reaped_pair(AuthHandler) as (server, client):
+ self.assertTrue('AUTH=CRAM-MD5' in client.capabilities)
+ ret, data = client.login_cram_md5("tim", "tanstaaftanstaaf")
+ self.assertEqual(ret, "OK")
+
+ with self.reaped_pair(AuthHandler) as (server, client):
+ self.assertTrue('AUTH=CRAM-MD5' in client.capabilities)
+ ret, data = client.login_cram_md5("tim", b"tanstaaftanstaaf")
+ self.assertEqual(ret, "OK")
class ThreadedNetworkedTests(BaseThreadedNetworkedTests):