diff options
author | R David Murray <rdmurray@bitdance.com> | 2013-02-19 17:17:31 (GMT) |
---|---|---|
committer | R David Murray <rdmurray@bitdance.com> | 2013-02-19 17:17:31 (GMT) |
commit | 774a39f26ef2fa8ed96f3c52d3edac5e93926b4b (patch) | |
tree | a7fb7a639755bc6843c6189d9b928e0039433bcd /Lib | |
parent | 6b30759022d836099a0844983816edaa5e64f52f (diff) | |
download | cpython-774a39f26ef2fa8ed96f3c52d3edac5e93926b4b.zip cpython-774a39f26ef2fa8ed96f3c52d3edac5e93926b4b.tar.gz cpython-774a39f26ef2fa8ed96f3c52d3edac5e93926b4b.tar.bz2 |
#13700: Make imap.authenticate with authobject work.
This fixes a bytes/string confusion in the API which prevented
custom authobjects from working at all.
Original patch by Erno Tukia.
Diffstat (limited to 'Lib')
-rw-r--r-- | Lib/imaplib.py | 20 | ||||
-rw-r--r-- | Lib/test/test_imaplib.py | 127 |
2 files changed, 130 insertions, 17 deletions
diff --git a/Lib/imaplib.py b/Lib/imaplib.py index c0334d8..00a17fb 100644 --- a/Lib/imaplib.py +++ b/Lib/imaplib.py @@ -360,10 +360,10 @@ class IMAP4: data = authobject(response) - It will be called to process server continuation responses. - It should return data that will be encoded and sent to server. - It should return None if the client abort response '*' should - be sent instead. + It will be called to process server continuation responses; the + response argument it is passed will be a bytes. It should return bytes + data that will be base64 encoded and sent to the server. It should + return None if the client abort response '*' should be sent instead. """ mech = mechanism.upper() # XXX: shouldn't this code be removed, not commented out? @@ -546,7 +546,9 @@ class IMAP4: def _CRAM_MD5_AUTH(self, challenge): """ Authobject to use with CRAM-MD5 authentication. """ import hmac - return self.user + " " + hmac.HMAC(self.password, challenge).hexdigest() + pwd = (self.password.encode('ASCII') if isinstance(self.password, str) + else self.password) + return self.user + " " + hmac.HMAC(pwd, challenge).hexdigest() def logout(self): @@ -1288,14 +1290,16 @@ class _Authenticator: # so when it gets to the end of the 8-bit input # there's no partial 6-bit output. # - oup = '' + oup = b'' + if isinstance(inp, str): + inp = inp.encode('ASCII') while inp: if len(inp) > 48: t = inp[:48] inp = inp[48:] else: t = inp - inp = '' + inp = b'' e = binascii.b2a_base64(t) if e: oup = oup + e[:-1] @@ -1303,7 +1307,7 @@ class _Authenticator: def decode(self, inp): if not inp: - return '' + return b'' return binascii.a2b_base64(inp) diff --git a/Lib/test/test_imaplib.py b/Lib/test/test_imaplib.py index 43ee7df..6b29943 100644 --- a/Lib/test/test_imaplib.py +++ b/Lib/test/test_imaplib.py @@ -78,14 +78,25 @@ else: class SimpleIMAPHandler(socketserver.StreamRequestHandler): timeout = 1 + continuation = None + capabilities = '' def _send(self, message): if verbose: print("SENT: %r" % message.strip()) self.wfile.write(message) + def _send_line(self, message): + self._send(message + b'\r\n') + + def _send_textline(self, message): + self._send_line(message.encode('ASCII')) + + def _send_tagged(self, tag, code, message): + self._send_textline(' '.join((tag, code, message))) + def handle(self): # Send a welcome message. - self._send(b'* OK IMAP4rev1\r\n') + self._send_textline('* OK IMAP4rev1') while 1: # Gather up input until we receive a line terminator or we timeout. # Accumulate read(1) because it's simpler to handle the differences @@ -105,19 +116,33 @@ class SimpleIMAPHandler(socketserver.StreamRequestHandler): break if verbose: print('GOT: %r' % line.strip()) - splitline = line.split() - tag = splitline[0].decode('ASCII') - cmd = splitline[1].decode('ASCII') + if self.continuation: + try: + self.continuation.send(line) + except StopIteration: + self.continuation = None + continue + splitline = line.decode('ASCII').split() + tag = splitline[0] + cmd = splitline[1] args = splitline[2:] if hasattr(self, 'cmd_'+cmd): - getattr(self, 'cmd_'+cmd)(tag, args) + continuation = getattr(self, 'cmd_'+cmd)(tag, args) + if continuation: + self.continuation = continuation + next(continuation) else: - self._send('{} BAD {} unknown\r\n'.format(tag, cmd).encode('ASCII')) + self._send_tagged(tag, 'BAD', cmd + ' unknown') def cmd_CAPABILITY(self, tag, args): - self._send(b'* CAPABILITY IMAP4rev1\r\n') - self._send('{} OK CAPABILITY completed\r\n'.format(tag).encode('ASCII')) + caps = 'IMAP4rev1 ' + self.capabilities if self.capabilities else 'IMAP4rev1' + self._send_textline('* CAPABILITY ' + caps) + self._send_tagged(tag, 'OK', 'CAPABILITY completed') + + def cmd_LOGOUT(self, tag, args): + self._send_textline('* BYE IMAP4ref1 Server logging out') + self._send_tagged(tag, 'OK', 'LOGOUT completed') class BaseThreadedNetworkedTests(unittest.TestCase): @@ -167,6 +192,16 @@ class BaseThreadedNetworkedTests(unittest.TestCase): finally: self.reap_server(server, thread) + @contextmanager + def reaped_pair(self, hdlr): + server, thread = self.make_server((support.HOST, 0), hdlr) + client = self.imap_class(*server.server_address) + try: + yield server, client + finally: + client.logout() + self.reap_server(server, thread) + @reap_threads def test_connect(self): with self.reaped_server(SimpleIMAPHandler) as server: @@ -192,12 +227,86 @@ class BaseThreadedNetworkedTests(unittest.TestCase): def cmd_CAPABILITY(self, tag, args): self._send(b'* CAPABILITY IMAP4rev1 AUTH\n') - self._send('{} OK CAPABILITY completed\r\n'.format(tag).encode('ASCII')) + self._send_tagged(tag, 'OK', 'CAPABILITY completed') with self.reaped_server(BadNewlineHandler) as server: self.assertRaises(imaplib.IMAP4.abort, self.imap_class, *server.server_address) + @reap_threads + def test_bad_auth_name(self): + + class MyServer(SimpleIMAPHandler): + + def cmd_AUTHENTICATE(self, tag, args): + self._send_tagged(tag, 'NO', 'unrecognized authentication ' + 'type {}'.format(args[0])) + + with self.reaped_pair(MyServer) as (server, client): + with self.assertRaises(imaplib.IMAP4.error): + client.authenticate('METHOD', lambda: 1) + + @reap_threads + def test_invalid_authentication(self): + + class MyServer(SimpleIMAPHandler): + + def cmd_AUTHENTICATE(self, tag, args): + self._send_textline('+') + self.response = yield + self._send_tagged(tag, 'NO', '[AUTHENTICATIONFAILED] invalid') + + with self.reaped_pair(MyServer) as (server, client): + with self.assertRaises(imaplib.IMAP4.error): + code, data = client.authenticate('MYAUTH', lambda x: b'fake') + + @reap_threads + def test_valid_authentication(self): + + class MyServer(SimpleIMAPHandler): + + def cmd_AUTHENTICATE(self, tag, args): + self._send_textline('+') + self.server.response = yield + self._send_tagged(tag, 'OK', 'FAKEAUTH successful') + + with self.reaped_pair(MyServer) as (server, client): + code, data = client.authenticate('MYAUTH', lambda x: b'fake') + self.assertEqual(code, 'OK') + self.assertEqual(server.response, + b'ZmFrZQ==\r\n') #b64 encoded 'fake' + + with self.reaped_pair(MyServer) as (server, client): + code, data = client.authenticate('MYAUTH', lambda x: 'fake') + self.assertEqual(code, 'OK') + self.assertEqual(server.response, + b'ZmFrZQ==\r\n') #b64 encoded 'fake' + + @reap_threads + def test_login_cram_md5(self): + + class AuthHandler(SimpleIMAPHandler): + + capabilities = 'LOGINDISABLED AUTH=CRAM-MD5' + + def cmd_AUTHENTICATE(self, tag, args): + self._send_textline('+ PDE4OTYuNjk3MTcwOTUyQHBvc3RvZmZpY2Uucm' + 'VzdG9uLm1jaS5uZXQ=') + r = yield + if r == b'dGltIGYxY2E2YmU0NjRiOWVmYTFjY2E2ZmZkNmNmMmQ5ZjMy\r\n': + self._send_tagged(tag, 'OK', 'CRAM-MD5 successful') + else: + self._send_tagged(tag, 'NO', 'No access') + + with self.reaped_pair(AuthHandler) as (server, client): + self.assertTrue('AUTH=CRAM-MD5' in client.capabilities) + ret, data = client.login_cram_md5("tim", "tanstaaftanstaaf") + self.assertEqual(ret, "OK") + + with self.reaped_pair(AuthHandler) as (server, client): + self.assertTrue('AUTH=CRAM-MD5' in client.capabilities) + ret, data = client.login_cram_md5("tim", b"tanstaaftanstaaf") + self.assertEqual(ret, "OK") class ThreadedNetworkedTests(BaseThreadedNetworkedTests): |