summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--Lib/imaplib.py2
-rw-r--r--Lib/test/test_imaplib.py197
-rw-r--r--Misc/NEWS3
3 files changed, 199 insertions, 3 deletions
diff --git a/Lib/imaplib.py b/Lib/imaplib.py
index df35cb1..f803a5c 100644
--- a/Lib/imaplib.py
+++ b/Lib/imaplib.py
@@ -1023,6 +1023,8 @@ class IMAP4:
raise self.abort('socket error: EOF')
# Protocol mandates all lines terminated by CRLF
+ if not line.endswith(b'\r\n'):
+ raise self.abort('socket error: unterminated line')
line = line[:-2]
if __debug__:
diff --git a/Lib/test/test_imaplib.py b/Lib/test/test_imaplib.py
index 2105fc2..fb5cf82 100644
--- a/Lib/test/test_imaplib.py
+++ b/Lib/test/test_imaplib.py
@@ -1,11 +1,31 @@
+from test import support
+# If we end up with a significant number of tests that don't require
+# threading, this test module should be split. Right now we skip
+# them all if we don't have threading.
+threading = support.import_module('threading')
+
+from contextlib import contextmanager
import imaplib
+import os.path
+import select
+import socket
+import socketserver
+import sys
import time
-from test import support
+from test.support import reap_threads, verbose
import unittest
+try:
+ import ssl
+except ImportError:
+ ssl = None
+
+CERTFILE = None
+
class TestImaplib(unittest.TestCase):
+
def test_that_Time2Internaldate_returns_a_result(self):
# We can check only that it successfully produces a result,
# not the correctness of the result itself, since the result
@@ -17,9 +37,180 @@ class TestImaplib(unittest.TestCase):
imaplib.Time2Internaldate(t)
+if ssl:
+
+ class SecureTCPServer(socketserver.TCPServer):
+
+ def get_request(self):
+ newsocket, fromaddr = self.socket.accept()
+ connstream = ssl.wrap_socket(newsocket,
+ server_side=True,
+ certfile=CERTFILE)
+ return connstream, fromaddr
+
+ IMAP4_SSL = imaplib.IMAP4_SSL
+
+else:
+
+ class SecureTCPServer:
+ pass
+
+ IMAP4_SSL = None
+
+
+class SimpleIMAPHandler(socketserver.StreamRequestHandler):
+
+ timeout = 1
+
+ def _send(self, message):
+ if verbose: print("SENT:", message.strip())
+ self.wfile.write(message)
+
+ def handle(self):
+ # Send a welcome message.
+ self._send(b'* OK IMAP4rev1\r\n')
+ while 1:
+ # Gather up input until we receive a line terminator or we timeout.
+ # Accumulate read(1) because it's simpler to handle the differences
+ # between naked sockets and SSL sockets.
+ line = b''
+ while 1:
+ try:
+ part = self.rfile.read(1)
+ if part == b'':
+ # Naked sockets return empty strings..
+ return
+ line += part
+ except IOError:
+ # ..but SSLSockets throw exceptions.
+ return
+ if line.endswith(b'\r\n'):
+ break
+
+ if verbose: print('GOT:', line.strip())
+ splitline = line.split()
+ tag = splitline[0].decode('ASCII')
+ cmd = splitline[1].decode('ASCII')
+ args = splitline[2:]
+
+ if hasattr(self, 'cmd_'+cmd):
+ getattr(self, 'cmd_'+cmd)(tag, args)
+ else:
+ self._send('{} BAD {} unknown\r\n'.format(tag, cmd).encode('ASCII'))
+
+ def cmd_CAPABILITY(self, tag, args):
+ self._send(b'* CAPABILITY IMAP4rev1\r\n')
+ self._send('{} OK CAPABILITY completed\r\n'.format(tag).encode('ASCII'))
+
+
+class BaseThreadedNetworkedTests(unittest.TestCase):
+
+ def make_server(self, addr, hdlr):
+
+ class MyServer(self.server_class):
+ def handle_error(self, request, client_address):
+ self.close_request(request)
+ self.server_close()
+ raise
+
+ if verbose: print("creating server")
+ server = MyServer(addr, hdlr)
+ self.assertEquals(server.server_address, server.socket.getsockname())
+
+ if verbose:
+ print("server created")
+ print("ADDR =", addr)
+ print("CLASS =", self.server_class)
+ print("HDLR =", server.RequestHandlerClass)
+
+ t = threading.Thread(
+ name='%s serving' % self.server_class,
+ target=server.serve_forever,
+ # Short poll interval to make the test finish quickly.
+ # Time between requests is short enough that we won't wake
+ # up spuriously too many times.
+ kwargs={'poll_interval':0.01})
+ t.daemon = True # In case this function raises.
+ t.start()
+ if verbose: print("server running")
+ return server, t
+
+ def reap_server(self, server, thread):
+ if verbose: print("waiting for server")
+ server.shutdown()
+ thread.join()
+ if verbose: print("done")
+
+ @contextmanager
+ def reaped_server(self, hdlr):
+ server, thread = self.make_server((support.HOST, 0), hdlr)
+ try:
+ yield server
+ finally:
+ self.reap_server(server, thread)
+
+ @reap_threads
+ def test_connect(self):
+ with self.reaped_server(SimpleIMAPHandler) as server:
+ client = self.imap_class(*server.server_address)
+ client.shutdown()
+
+ @reap_threads
+ def test_issue5949(self):
+
+ class EOFHandler(socketserver.StreamRequestHandler):
+ def handle(self):
+ # EOF without sending a complete welcome message.
+ self.wfile.write(b'* OK')
+
+ with self.reaped_server(EOFHandler) as server:
+ self.assertRaises(imaplib.IMAP4.abort,
+ self.imap_class, *server.server_address)
+
+ @reap_threads
+ def test_line_termination(self):
+
+ class BadNewlineHandler(SimpleIMAPHandler):
+
+ def cmd_CAPABILITY(self, tag, args):
+ self._send(b'* CAPABILITY IMAP4rev1 AUTH\n')
+ self._send('{} OK CAPABILITY completed\r\n'.format(tag).encode('ASCII'))
+
+ with self.reaped_server(BadNewlineHandler) as server:
+ self.assertRaises(imaplib.IMAP4.abort,
+ self.imap_class, *server.server_address)
+
+
+
+class ThreadedNetworkedTests(BaseThreadedNetworkedTests):
+
+ server_class = socketserver.TCPServer
+ imap_class = imaplib.IMAP4
+
+
+@unittest.skipUnless(ssl, "SSL not available")
+class ThreadedNetworkedTestsSSL(BaseThreadedNetworkedTests):
+
+ server_class = SecureTCPServer
+ imap_class = IMAP4_SSL
+
+
def test_main():
- support.run_unittest(TestImaplib)
+
+ tests = [TestImaplib]
+
+ if support.is_resource_enabled('network'):
+ if ssl:
+ global CERTFILE
+ CERTFILE = os.path.join(os.path.dirname(__file__) or os.curdir,
+ "keycert.pem")
+ if not os.path.exists(CERTFILE):
+ raise support.TestFailed("Can't read certificate files!")
+ tests.extend([ThreadedNetworkedTests, ThreadedNetworkedTestsSSL])
+
+ support.run_unittest(*tests)
if __name__ == "__main__":
- unittest.main()
+ support.use_resources = ['network']
+ test_main()
diff --git a/Misc/NEWS b/Misc/NEWS
index 700a1ae..d965a11 100644
--- a/Misc/NEWS
+++ b/Misc/NEWS
@@ -55,6 +55,9 @@ Core and Builtins
Library
-------
+- Issue #5949: added check for correct lineends in input from IMAP server
+ in imaplib.
+
- Fix variations of extending deques: d.extend(d) d.extendleft(d) d+=d
- Issue #6986: Fix crash in the JSON C accelerator when called with the