From d395629e8272f7d09b8ac919fbea66c2c070f8e7 Mon Sep 17 00:00:00 2001 From: Christian Heimes Date: Wed, 5 Nov 2008 19:48:27 +0000 Subject: Fixed issue #3727: poplib module broken by str to unicode conversion Victor strikes again! Assisted by Barry --- Lib/poplib.py | 104 ++++------------- Lib/test/test_poplib.py | 299 ++++++++++++++++++++++++++++++++++++++++++++---- Misc/NEWS | 2 + 3 files changed, 300 insertions(+), 105 deletions(-) diff --git a/Lib/poplib.py b/Lib/poplib.py index bd82841..770819e 100644 --- a/Lib/poplib.py +++ b/Lib/poplib.py @@ -75,26 +75,30 @@ class POP3: above. """ + encoding = 'UTF-8' def __init__(self, host, port=POP3_PORT, timeout=socket._GLOBAL_DEFAULT_TIMEOUT): self.host = host self.port = port - self.sock = socket.create_connection((host, port), timeout) + self.sock = self._create_socket(timeout) self.file = self.sock.makefile('rb') self._debugging = 0 self.welcome = self._getresp() + def _create_socket(self, timeout): + return socket.create_connection((self.host, self.port), timeout) def _putline(self, line): if self._debugging > 1: print('*put*', repr(line)) - self.sock.sendall('%s%s' % (line, CRLF)) + self.sock.sendall(line + CRLF) # Internal: send one command to the server (through _putline()) def _putcmd(self, line): if self._debugging: print('*cmd*', repr(line)) + line = bytes(line, self.encoding) self._putline(line) @@ -123,8 +127,7 @@ class POP3: def _getresp(self): resp, o = self._getline() if self._debugging > 1: print('*resp*', repr(resp)) - c = resp[:1] - if c != b'+': + if not resp.startswith(b'+'): raise error_proto(resp) return resp @@ -136,7 +139,7 @@ class POP3: list = []; octets = 0 line, o = self._getline() while line != b'.': - if line[:2] == b'..': + if line.startswith(b'..'): o = o-1 line = line[1:] octets = octets + o @@ -266,25 +269,26 @@ class POP3: return self._shortcmd('RPOP %s' % user) - timestamp = re.compile(r'\+OK.*(<[^>]+>)') + timestamp = re.compile(br'\+OK.*(<[^>]+>)') - def apop(self, user, secret): + def apop(self, user, password): """Authorisation - only possible if server has supplied a timestamp in initial greeting. Args: - user - mailbox user; - secret - secret shared between client and server. + user - mailbox user; + password - mailbox password. NB: mailbox is locked by server from here to 'quit()' """ + secret = bytes(secret, self.encoding) m = self.timestamp.match(self.welcome) if not m: raise error_proto('-ERR APOP not supported by server') import hashlib - digest = hashlib.md5(m.group(1)+secret).digest() - digest = ''.join(map(lambda x:'%02x'%ord(x), digest)) + digest = m.group(1)+secret + digest = hashlib.md5(digest).hexdigest() return self._shortcmd('APOP %s %s' % (user, digest)) @@ -324,79 +328,19 @@ else: keyfile - PEM formatted file that countains your private key certfile - PEM formatted certificate chain file - See the methods of the parent class POP3 for more documentation. + See the methods of the parent class POP3 for more documentation. """ - def __init__(self, host, port = POP3_SSL_PORT, keyfile = None, certfile = None): - self.host = host - self.port = port + def __init__(self, host, port=POP3_SSL_PORT, + keyfile=None, certfile=None, + timeout=socket._GLOBAL_DEFAULT_TIMEOUT): self.keyfile = keyfile self.certfile = certfile - self.buffer = "" - msg = "getaddrinfo returns an empty list" - self.sock = None - for res in socket.getaddrinfo(self.host, self.port, 0, socket.SOCK_STREAM): - af, socktype, proto, canonname, sa = res - try: - self.sock = socket.socket(af, socktype, proto) - self.sock.connect(sa) - except socket.error as msg: - if self.sock: - self.sock.close() - self.sock = None - continue - break - if not self.sock: - raise socket.error(msg) - self.file = self.sock.makefile('rb') - self.sslobj = ssl.wrap_socket(self.sock, self.keyfile, self.certfile) - self._debugging = 0 - self.welcome = self._getresp() - - def _fillBuffer(self): - localbuf = self.sslobj.read() - if len(localbuf) == 0: - raise error_proto('-ERR EOF') - self.buffer += localbuf - - def _getline(self): - line = "" - renewline = re.compile(r'.*?\n') - match = renewline.match(self.buffer) - while not match: - self._fillBuffer() - match = renewline.match(self.buffer) - line = match.group(0) - self.buffer = renewline.sub('' ,self.buffer, 1) - if self._debugging > 1: print('*get*', repr(line)) - - octets = len(line) - if line[-2:] == CRLF: - return line[:-2], octets - if line[0] == CR: - return line[1:-1], octets - return line[:-1], octets - - def _putline(self, line): - if self._debugging > 1: print('*put*', repr(line)) - line += CRLF - bytes = len(line) - while bytes > 0: - sent = self.sslobj.write(line) - if sent == bytes: - break # avoid copy - line = line[sent:] - bytes = bytes - sent - - def quit(self): - """Signoff: commit changes on server, unlock mailbox, close connection.""" - try: - resp = self._shortcmd('QUIT') - except error_proto as val: - resp = val - self.sock.close() - del self.sslobj, self.sock - return resp + POP3.__init__(self, host, port, timeout) + + def _create_socket(self, timeout): + sock = POP3._create_socket(self, timeout) + return ssl.wrap_socket(sock, self.keyfile, self.certfile) __all__.append("POP3_SSL") diff --git a/Lib/test/test_poplib.py b/Lib/test/test_poplib.py index f9c52f9..ad00802 100644 --- a/Lib/test/test_poplib.py +++ b/Lib/test/test_poplib.py @@ -1,43 +1,284 @@ -import socket -import threading +"""Test script for poplib module.""" + +# Modified by Giampaolo Rodola' to give poplib.POP3 and poplib.POP3_SSL +# a real test suite + import poplib +import threading +import asyncore +import asynchat +import socket +import os import time from unittest import TestCase -from test import support +from test import support as test_support -HOST = support.HOST +HOST = test_support.HOST +PORT = 0 -def server(evt, serv): - serv.listen(5) - try: - conn, addr = serv.accept() - except socket.timeout: - pass - else: - conn.send(b"+ Hola mundo\n") - conn.close() - finally: - serv.close() - evt.set() +# the dummy data returned by server when LIST and RETR commands are issued +LIST_RESP = b'1 1\r\n2 2\r\n3 3\r\n4 4\r\n5 5\r\n.\r\n' +RETR_RESP = b"""From: postmaster@python.org\ +\r\nContent-Type: text/plain\r\n\ +MIME-Version: 1.0\r\n\ +Subject: Dummy\r\n\ +\r\n\ +line1\r\n\ +line2\r\n\ +line3\r\n\ +.\r\n""" + + +class DummyPOP3Handler(asynchat.async_chat): + + def __init__(self, conn): + asynchat.async_chat.__init__(self, conn) + self.set_terminator(b"\r\n") + self.in_buffer = [] + self.push('+OK dummy pop3 server ready.') + + def collect_incoming_data(self, data): + self.in_buffer.append(data) + + def found_terminator(self): + line = b''.join(self.in_buffer) + line = str(line, 'ISO-8859-1') + self.in_buffer = [] + cmd = line.split(' ')[0].lower() + space = line.find(' ') + if space != -1: + arg = line[space + 1:] + else: + arg = "" + if hasattr(self, 'cmd_' + cmd): + method = getattr(self, 'cmd_' + cmd) + method(arg) + else: + self.push('-ERR unrecognized POP3 command "%s".' %cmd) + + def handle_error(self): + raise + + def push(self, data): + asynchat.async_chat.push(self, data.encode("ISO-8859-1") + b'\r\n') + + def cmd_echo(self, arg): + # sends back the received string (used by the test suite) + self.push(arg) + + def cmd_user(self, arg): + if arg != "guido": + self.push("-ERR no such user") + self.push('+OK password required') + + def cmd_pass(self, arg): + if arg != "python": + self.push("-ERR wrong password") + self.push('+OK 10 messages') + + def cmd_stat(self, arg): + self.push('+OK 10 100') + + def cmd_list(self, arg): + if arg: + self.push('+OK %s %s' %(arg, arg)) + else: + self.push('+OK') + asynchat.async_chat.push(self, LIST_RESP) + + cmd_uidl = cmd_list + + def cmd_retr(self, arg): + self.push('+OK %s bytes' %len(RETR_RESP)) + asynchat.async_chat.push(self, RETR_RESP) + + cmd_top = cmd_retr + + def cmd_dele(self, arg): + self.push('+OK message marked for deletion.') + + def cmd_noop(self, arg): + self.push('+OK done nothing.') + + def cmd_rpop(self, arg): + self.push('+OK done nothing.') + + +class DummyPOP3Server(asyncore.dispatcher, threading.Thread): + + handler = DummyPOP3Handler + + def __init__(self, address, af=socket.AF_INET): + threading.Thread.__init__(self) + asyncore.dispatcher.__init__(self) + self.create_socket(af, socket.SOCK_STREAM) + self.bind(address) + self.listen(5) + self.active = False + self.active_lock = threading.Lock() + self.host, self.port = self.socket.getsockname()[:2] + + def start(self): + assert not self.active + self.__flag = threading.Event() + threading.Thread.start(self) + self.__flag.wait() + + def run(self): + self.active = True + self.__flag.set() + while self.active and asyncore.socket_map: + self.active_lock.acquire() + asyncore.loop(timeout=0.1, count=1) + self.active_lock.release() + asyncore.close_all(ignore_all=True) + + def stop(self): + assert self.active + self.active = False + self.join() + + def handle_accept(self): + conn, addr = self.accept() + self.handler = self.handler(conn) + self.close() + + def handle_connect(self): + self.close() + handle_read = handle_connect -class GeneralTests(TestCase): + def writable(self): + return 0 + + def handle_error(self): + raise + + +class TestPOP3Class(TestCase): + def assertOK(self, resp): + self.assertTrue(resp.startswith(b"+OK")) + + def setUp(self): + self.server = DummyPOP3Server((HOST, PORT)) + self.server.start() + self.client = poplib.POP3(self.server.host, self.server.port) + + def tearDown(self): + self.client.quit() + self.server.stop() + + def test_getwelcome(self): + self.assertEqual(self.client.getwelcome(), b'+OK dummy pop3 server ready.') + + def test_exceptions(self): + self.assertRaises(poplib.error_proto, self.client._shortcmd, 'echo -err') + + def test_user(self): + self.assertOK(self.client.user('guido')) + self.assertRaises(poplib.error_proto, self.client.user, 'invalid') + + def test_pass_(self): + self.assertOK(self.client.pass_('python')) + self.assertRaises(poplib.error_proto, self.client.user, 'invalid') + + def test_stat(self): + self.assertEqual(self.client.stat(), (10, 100)) + + def test_list(self): + self.assertEqual(self.client.list()[1:], + ([b'1 1', b'2 2', b'3 3', b'4 4', b'5 5'], + 25)) + self.assertTrue(self.client.list('1').endswith(b"OK 1 1")) + + def test_retr(self): + expected = (b'+OK 116 bytes', + [b'From: postmaster@python.org', b'Content-Type: text/plain', + b'MIME-Version: 1.0', b'Subject: Dummy', + b'', b'line1', b'line2', b'line3'], + 113) + foo = self.client.retr('foo') + self.assertEqual(foo, expected) + + def test_dele(self): + self.assertOK(self.client.dele('foo')) + + def test_noop(self): + self.assertOK(self.client.noop()) + + def test_rpop(self): + self.assertOK(self.client.rpop('foo')) + + def test_top(self): + expected = (b'+OK 116 bytes', + [b'From: postmaster@python.org', b'Content-Type: text/plain', + b'MIME-Version: 1.0', b'Subject: Dummy', b'', + b'line1', b'line2', b'line3'], + 113) + self.assertEqual(self.client.top(1, 1), expected) + + def test_uidl(self): + self.client.uidl() + self.client.uidl('foo') + + +SUPPORTS_SSL = False +if hasattr(poplib, 'POP3_SSL'): + import ssl + + SUPPORTS_SSL = True + CERTFILE = os.path.join(os.path.dirname(__file__) or os.curdir, "keycert.pem") + + class DummyPOP3_SSLHandler(DummyPOP3Handler): + + def __init__(self, conn): + asynchat.async_chat.__init__(self, conn) + ssl_socket = ssl.wrap_socket(self.socket, certfile=CERTFILE, + server_side=True) + self.del_channel() + self.set_socket(ssl_socket) + self.set_terminator(b"\r\n") + self.in_buffer = [] + self.push('+OK dummy pop3 server ready.') + + class TestPOP3_SSLClass(TestPOP3Class): + # repeat previous tests by using poplib.POP3_SSL + + def setUp(self): + self.server = DummyPOP3Server((HOST, PORT)) + self.server.handler = DummyPOP3_SSLHandler + self.server.start() + self.client = poplib.POP3_SSL(self.server.host, self.server.port) + + def test__all__(self): + self.assert_('POP3_SSL' in poplib.__all__) + + +class TestTimeouts(TestCase): def setUp(self): self.evt = threading.Event() self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.sock.settimeout(3) - self.port = support.bind_port(self.sock) - threading.Thread(target=server, args=(self.evt,self.sock)).start() + self.port = test_support.bind_port(self.sock) + threading.Thread(target=self.server, args=(self.evt,self.sock)).start() time.sleep(.1) def tearDown(self): self.evt.wait() - def testBasic(self): - # connects - pop = poplib.POP3(HOST, self.port) - pop.sock.close() + def server(self, evt, serv): + serv.listen(5) + try: + conn, addr = serv.accept() + except socket.timeout: + pass + else: + conn.send(b"+ Hola mundo\n") + conn.close() + finally: + serv.close() + evt.set() def testTimeoutDefault(self): self.assertTrue(socket.getdefaulttimeout() is None) @@ -65,8 +306,16 @@ class GeneralTests(TestCase): pop.sock.close() -def test_main(verbose=None): - support.run_unittest(GeneralTests) +def test_main(): + tests = [TestPOP3Class, TestTimeouts] + if SUPPORTS_SSL: + tests.append(TestPOP3_SSLClass) + thread_info = test_support.threading_setup() + try: + test_support.run_unittest(*tests) + finally: + test_support.threading_cleanup(*thread_info) + if __name__ == '__main__': test_main() diff --git a/Misc/NEWS b/Misc/NEWS index ae1e63d..e422b38 100644 --- a/Misc/NEWS +++ b/Misc/NEWS @@ -15,6 +15,8 @@ What's New in Python 3.0 beta 5 Core and Builtins ----------------- +- Issue #3727: Fixed poplib + - Issue #3714: Fixed nntplib by using bytes where appropriate. - Issue #1210: Fixed imaplib and its documentation. -- cgit v0.12