summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorR. David Murray <rdmurray@bitdance.com>2009-12-09 15:15:31 (GMT)
committerR. David Murray <rdmurray@bitdance.com>2009-12-09 15:15:31 (GMT)
commit93321f333c7fe4a55bdedc9511667916f8bd2ed5 (patch)
tree81ab146556158c027d16da4a579ad6a1a49b825a
parent29dcdabf40f7642c96169ede9040d50a10926832 (diff)
downloadcpython-93321f333c7fe4a55bdedc9511667916f8bd2ed5.zip
cpython-93321f333c7fe4a55bdedc9511667916f8bd2ed5.tar.gz
cpython-93321f333c7fe4a55bdedc9511667916f8bd2ed5.tar.bz2
Issue 5949: fixed IMAP4_SSL hang when the IMAP server response is
missing proper end-of-line termination. Patch and tests by Scott Dial. The new tests include a test harness which will make it easier to add additional tests.
-rw-r--r--Lib/imaplib.py4
-rw-r--r--Lib/test/test_imaplib.py166
-rw-r--r--Misc/NEWS4
3 files changed, 170 insertions, 4 deletions
diff --git a/Lib/imaplib.py b/Lib/imaplib.py
index f13350e..5a45845 100644
--- a/Lib/imaplib.py
+++ b/Lib/imaplib.py
@@ -1001,6 +1001,8 @@ class IMAP4:
raise self.abort('socket error: EOF')
# Protocol mandates all lines terminated by CRLF
+ if not line.endswith('\r\n'):
+ raise self.abort('socket error: unterminated line')
line = line[:-2]
if __debug__:
@@ -1167,7 +1169,7 @@ else:
while 1:
char = self.sslobj.read(1)
line.append(char)
- if char == "\n": return ''.join(line)
+ if char in ("\n", ""): return ''.join(line)
def send(self, data):
diff --git a/Lib/test/test_imaplib.py b/Lib/test/test_imaplib.py
index ce0e075..ded8828 100644
--- a/Lib/test/test_imaplib.py
+++ b/Lib/test/test_imaplib.py
@@ -1,11 +1,27 @@
+from test import test_support as support
+# If we end up with a significant number of tests that don't require
+# threading, this test module should be split. Right now we skip
+# them all if we don't have threading.
+threading = support.import_module('threading')
+
+from contextlib import contextmanager
import imaplib
+import os.path
+import select
+import socket
+import SocketServer
+import ssl
+import sys
import time
-from test import test_support
+from test_support import reap_threads, verbose
import unittest
+CERTFILE = None
+
class TestImaplib(unittest.TestCase):
+
def test_that_Time2Internaldate_returns_a_result(self):
# We can check only that it successfully produces a result,
# not the correctness of the result itself, since the result
@@ -17,9 +33,153 @@ class TestImaplib(unittest.TestCase):
imaplib.Time2Internaldate(t)
+class SecureTCPServer(SocketServer.TCPServer):
+
+ def get_request(self):
+ newsocket, fromaddr = self.socket.accept()
+ connstream = ssl.wrap_socket(newsocket,
+ server_side=True,
+ certfile=CERTFILE)
+ return connstream, fromaddr
+
+
+class SimpleIMAPHandler(SocketServer.StreamRequestHandler):
+
+ timeout = 1
+
+ def _send(self, message):
+ if verbose: print "SENT:", message.strip()
+ self.wfile.write(message)
+
+ def handle(self):
+ # Send a welcome message.
+ self._send('* OK IMAP4rev1\r\n')
+ while 1:
+ # Gather up input until we receive a line terminator or we timeout.
+ # Accumulate read(1) because it's simpler to handle the differences
+ # between naked sockets and SSL sockets.
+ line = ''
+ while 1:
+ try:
+ part = self.rfile.read(1)
+ if part == '':
+ # Naked sockets return empty strings..
+ return
+ line += part
+ except IOError:
+ # ..but SSLSockets throw exceptions.
+ return
+ if line.endswith('\r\n'):
+ break
+
+ if verbose: print 'GOT:', line.strip()
+ splitline = line.split()
+ tag = splitline[0]
+ cmd = splitline[1]
+ args = splitline[2:]
+
+ if hasattr(self, 'cmd_%s' % (cmd,)):
+ getattr(self, 'cmd_%s' % (cmd,))(tag, args)
+ else:
+ self._send('%s BAD %s unknown\r\n' % (tag, cmd))
+
+ def cmd_CAPABILITY(self, tag, args):
+ self._send('* CAPABILITY IMAP4rev1\r\n')
+ self._send('%s OK CAPABILITY completed\r\n' % (tag,))
+
+
+class BaseThreadedNetworkedTests(unittest.TestCase):
+
+ def make_server(self, addr, hdlr):
+
+ class MyServer(self.server_class):
+ def handle_error(self, request, client_address):
+ self.close_request(request)
+ self.server_close()
+ raise
+
+ if verbose: print "creating server"
+ server = MyServer(addr, hdlr)
+ self.assertEquals(server.server_address, server.socket.getsockname())
+
+ if verbose:
+ print "server created"
+ print "ADDR =", addr
+ print "CLASS =", self.server_class
+ print "HDLR =", server.RequestHandlerClass
+
+ t = threading.Thread(
+ name='%s serving' % self.server_class,
+ target=server.serve_forever,
+ # Short poll interval to make the test finish quickly.
+ # Time between requests is short enough that we won't wake
+ # up spuriously too many times.
+ kwargs={'poll_interval':0.01})
+ t.daemon = True # In case this function raises.
+ t.start()
+ if verbose: print "server running"
+ return server, t
+
+ def reap_server(self, server, thread):
+ if verbose: print "waiting for server"
+ server.shutdown()
+ thread.join()
+ if verbose: print "done"
+
+ @contextmanager
+ def reaped_server(self, hdlr):
+ server, thread = self.make_server((support.HOST, 0), hdlr)
+ try:
+ yield server
+ finally:
+ self.reap_server(server, thread)
+
+ @reap_threads
+ def test_connect(self):
+ with self.reaped_server(SimpleIMAPHandler) as server:
+ client = self.imap_class(*server.server_address)
+ client.shutdown()
+
+ @reap_threads
+ def test_issue5949(self):
+
+ class EOFHandler(SocketServer.StreamRequestHandler):
+ def handle(self):
+ # EOF without sending a complete welcome message.
+ self.wfile.write('* OK')
+
+ with self.reaped_server(EOFHandler) as server:
+ self.assertRaises(imaplib.IMAP4.abort,
+ self.imap_class, *server.server_address)
+
+
+class ThreadedNetworkedTests(BaseThreadedNetworkedTests):
+
+ server_class = SocketServer.TCPServer
+ imap_class = imaplib.IMAP4
+
+
+class ThreadedNetworkedTestsSSL(BaseThreadedNetworkedTests):
+
+ server_class = SecureTCPServer
+ imap_class = imaplib.IMAP4_SSL
+
+
def test_main():
- test_support.run_unittest(TestImaplib)
+
+ tests = [TestImaplib]
+
+ if support.is_resource_enabled('network'):
+ global CERTFILE
+ CERTFILE = os.path.join(os.path.dirname(__file__) or os.curdir,
+ "keycert.pem")
+ if not os.path.exists(CERTFILE):
+ raise support.TestFailed("Can't read certificate files!")
+ tests.extend([ThreadedNetworkedTests, ThreadedNetworkedTestsSSL])
+
+ support.run_unittest(*tests)
if __name__ == "__main__":
- unittest.main()
+ support.use_resources = ['network']
+ test_main()
diff --git a/Misc/NEWS b/Misc/NEWS
index 95fae08..845ed7a 100644
--- a/Misc/NEWS
+++ b/Misc/NEWS
@@ -15,9 +15,13 @@ Core and Builtins
Library
-------
+- Issue #5949: fixed IMAP4_SSL hang when the IMAP server response is
+ missing proper end-of-line termination.
+
- Issue #7457: added a read_pkg_file method to
distutils.dist.DistributionMetadata.
+
What's New in Python 2.7 alpha 1
================================