summaryrefslogtreecommitdiffstats
path: root/Lib
diff options
context:
space:
mode:
authorR. David Murray <rdmurray@bitdance.com>2009-12-10 02:08:06 (GMT)
committerR. David Murray <rdmurray@bitdance.com>2009-12-10 02:08:06 (GMT)
commite8dc258db5898f5bbeb60c6780d1a3cb41585afe (patch)
treed4da1ddcb5749df406772ed01141e386d736cf92 /Lib
parente5fdedbeda147f3878f82349464f28f184c01658 (diff)
downloadcpython-e8dc258db5898f5bbeb60c6780d1a3cb41585afe.zip
cpython-e8dc258db5898f5bbeb60c6780d1a3cb41585afe.tar.gz
cpython-e8dc258db5898f5bbeb60c6780d1a3cb41585afe.tar.bz2
Merged revisions 76726-76727 via svnmerge from
svn+ssh://pythondev@svn.python.org/python/trunk The merge adds a test with an invalid rather than a missing line end, since the py3K code passed the original issue 5949 test. New test also by Scott Dial. ........ r76726 | r.david.murray | 2009-12-09 10:15:31 -0500 (Wed, 09 Dec 2009) | 6 lines Issue 5949: fixed IMAP4_SSL hang when the IMAP server response is missing proper end-of-line termination. Patch and tests by Scott Dial. The new tests include a test harness which will make it easier to add additional tests. ........ r76727 | r.david.murray | 2009-12-09 11:41:39 -0500 (Wed, 09 Dec 2009) | 2 lines Skip new imaplib SSL tests if ssl is not available. ........
Diffstat (limited to 'Lib')
-rw-r--r--Lib/imaplib.py2
-rw-r--r--Lib/test/test_imaplib.py197
2 files changed, 196 insertions, 3 deletions
diff --git a/Lib/imaplib.py b/Lib/imaplib.py
index df35cb1..f803a5c 100644
--- a/Lib/imaplib.py
+++ b/Lib/imaplib.py
@@ -1023,6 +1023,8 @@ class IMAP4:
raise self.abort('socket error: EOF')
# Protocol mandates all lines terminated by CRLF
+ if not line.endswith(b'\r\n'):
+ raise self.abort('socket error: unterminated line')
line = line[:-2]
if __debug__:
diff --git a/Lib/test/test_imaplib.py b/Lib/test/test_imaplib.py
index 2105fc2..fb5cf82 100644
--- a/Lib/test/test_imaplib.py
+++ b/Lib/test/test_imaplib.py
@@ -1,11 +1,31 @@
+from test import support
+# If we end up with a significant number of tests that don't require
+# threading, this test module should be split. Right now we skip
+# them all if we don't have threading.
+threading = support.import_module('threading')
+
+from contextlib import contextmanager
import imaplib
+import os.path
+import select
+import socket
+import socketserver
+import sys
import time
-from test import support
+from test.support import reap_threads, verbose
import unittest
+try:
+ import ssl
+except ImportError:
+ ssl = None
+
+CERTFILE = None
+
class TestImaplib(unittest.TestCase):
+
def test_that_Time2Internaldate_returns_a_result(self):
# We can check only that it successfully produces a result,
# not the correctness of the result itself, since the result
@@ -17,9 +37,180 @@ class TestImaplib(unittest.TestCase):
imaplib.Time2Internaldate(t)
+if ssl:
+
+ class SecureTCPServer(socketserver.TCPServer):
+
+ def get_request(self):
+ newsocket, fromaddr = self.socket.accept()
+ connstream = ssl.wrap_socket(newsocket,
+ server_side=True,
+ certfile=CERTFILE)
+ return connstream, fromaddr
+
+ IMAP4_SSL = imaplib.IMAP4_SSL
+
+else:
+
+ class SecureTCPServer:
+ pass
+
+ IMAP4_SSL = None
+
+
+class SimpleIMAPHandler(socketserver.StreamRequestHandler):
+
+ timeout = 1
+
+ def _send(self, message):
+ if verbose: print("SENT:", message.strip())
+ self.wfile.write(message)
+
+ def handle(self):
+ # Send a welcome message.
+ self._send(b'* OK IMAP4rev1\r\n')
+ while 1:
+ # Gather up input until we receive a line terminator or we timeout.
+ # Accumulate read(1) because it's simpler to handle the differences
+ # between naked sockets and SSL sockets.
+ line = b''
+ while 1:
+ try:
+ part = self.rfile.read(1)
+ if part == b'':
+ # Naked sockets return empty strings..
+ return
+ line += part
+ except IOError:
+ # ..but SSLSockets throw exceptions.
+ return
+ if line.endswith(b'\r\n'):
+ break
+
+ if verbose: print('GOT:', line.strip())
+ splitline = line.split()
+ tag = splitline[0].decode('ASCII')
+ cmd = splitline[1].decode('ASCII')
+ args = splitline[2:]
+
+ if hasattr(self, 'cmd_'+cmd):
+ getattr(self, 'cmd_'+cmd)(tag, args)
+ else:
+ self._send('{} BAD {} unknown\r\n'.format(tag, cmd).encode('ASCII'))
+
+ def cmd_CAPABILITY(self, tag, args):
+ self._send(b'* CAPABILITY IMAP4rev1\r\n')
+ self._send('{} OK CAPABILITY completed\r\n'.format(tag).encode('ASCII'))
+
+
+class BaseThreadedNetworkedTests(unittest.TestCase):
+
+ def make_server(self, addr, hdlr):
+
+ class MyServer(self.server_class):
+ def handle_error(self, request, client_address):
+ self.close_request(request)
+ self.server_close()
+ raise
+
+ if verbose: print("creating server")
+ server = MyServer(addr, hdlr)
+ self.assertEquals(server.server_address, server.socket.getsockname())
+
+ if verbose:
+ print("server created")
+ print("ADDR =", addr)
+ print("CLASS =", self.server_class)
+ print("HDLR =", server.RequestHandlerClass)
+
+ t = threading.Thread(
+ name='%s serving' % self.server_class,
+ target=server.serve_forever,
+ # Short poll interval to make the test finish quickly.
+ # Time between requests is short enough that we won't wake
+ # up spuriously too many times.
+ kwargs={'poll_interval':0.01})
+ t.daemon = True # In case this function raises.
+ t.start()
+ if verbose: print("server running")
+ return server, t
+
+ def reap_server(self, server, thread):
+ if verbose: print("waiting for server")
+ server.shutdown()
+ thread.join()
+ if verbose: print("done")
+
+ @contextmanager
+ def reaped_server(self, hdlr):
+ server, thread = self.make_server((support.HOST, 0), hdlr)
+ try:
+ yield server
+ finally:
+ self.reap_server(server, thread)
+
+ @reap_threads
+ def test_connect(self):
+ with self.reaped_server(SimpleIMAPHandler) as server:
+ client = self.imap_class(*server.server_address)
+ client.shutdown()
+
+ @reap_threads
+ def test_issue5949(self):
+
+ class EOFHandler(socketserver.StreamRequestHandler):
+ def handle(self):
+ # EOF without sending a complete welcome message.
+ self.wfile.write(b'* OK')
+
+ with self.reaped_server(EOFHandler) as server:
+ self.assertRaises(imaplib.IMAP4.abort,
+ self.imap_class, *server.server_address)
+
+ @reap_threads
+ def test_line_termination(self):
+
+ class BadNewlineHandler(SimpleIMAPHandler):
+
+ def cmd_CAPABILITY(self, tag, args):
+ self._send(b'* CAPABILITY IMAP4rev1 AUTH\n')
+ self._send('{} OK CAPABILITY completed\r\n'.format(tag).encode('ASCII'))
+
+ with self.reaped_server(BadNewlineHandler) as server:
+ self.assertRaises(imaplib.IMAP4.abort,
+ self.imap_class, *server.server_address)
+
+
+
+class ThreadedNetworkedTests(BaseThreadedNetworkedTests):
+
+ server_class = socketserver.TCPServer
+ imap_class = imaplib.IMAP4
+
+
+@unittest.skipUnless(ssl, "SSL not available")
+class ThreadedNetworkedTestsSSL(BaseThreadedNetworkedTests):
+
+ server_class = SecureTCPServer
+ imap_class = IMAP4_SSL
+
+
def test_main():
- support.run_unittest(TestImaplib)
+
+ tests = [TestImaplib]
+
+ if support.is_resource_enabled('network'):
+ if ssl:
+ global CERTFILE
+ CERTFILE = os.path.join(os.path.dirname(__file__) or os.curdir,
+ "keycert.pem")
+ if not os.path.exists(CERTFILE):
+ raise support.TestFailed("Can't read certificate files!")
+ tests.extend([ThreadedNetworkedTests, ThreadedNetworkedTestsSSL])
+
+ support.run_unittest(*tests)
if __name__ == "__main__":
- unittest.main()
+ support.use_resources = ['network']
+ test_main()