summaryrefslogtreecommitdiffstats
path: root/Lib/test/test_poplib.py
diff options
context:
space:
mode:
authorAntoine Pitrou <solipsis@pitrou.net>2012-11-23 19:13:48 (GMT)
committerAntoine Pitrou <solipsis@pitrou.net>2012-11-23 19:13:48 (GMT)
commit8618d7457b1a648a8183966d930077e5754dc0ed (patch)
treec5fba1246a6af3cb2a700ac1cdbc970bc19f6940 /Lib/test/test_poplib.py
parent25cee19beb3117f834100cf29cff625c98d0da29 (diff)
downloadcpython-8618d7457b1a648a8183966d930077e5754dc0ed.zip
cpython-8618d7457b1a648a8183966d930077e5754dc0ed.tar.gz
cpython-8618d7457b1a648a8183966d930077e5754dc0ed.tar.bz2
Issue #4473: Add a POP3.stls() to switch a clear-text POP3 session into an encrypted POP3 session, on supported servers.
Patch by Lorenzo Catucci.
Diffstat (limited to 'Lib/test/test_poplib.py')
-rw-r--r--Lib/test/test_poplib.py145
1 files changed, 114 insertions, 31 deletions
diff --git a/Lib/test/test_poplib.py b/Lib/test/test_poplib.py
index 8fe0c4cb..e85dd2a 100644
--- a/Lib/test/test_poplib.py
+++ b/Lib/test/test_poplib.py
@@ -18,6 +18,13 @@ threading = test_support.import_module('threading')
HOST = test_support.HOST
PORT = 0
+SUPPORTS_SSL = False
+if hasattr(poplib, 'POP3_SSL'):
+ import ssl
+
+ SUPPORTS_SSL = True
+ CERTFILE = os.path.join(os.path.dirname(__file__) or os.curdir, "keycert.pem")
+
# the dummy data returned by server when LIST and RETR commands are issued
LIST_RESP = b'1 1\r\n2 2\r\n3 3\r\n4 4\r\n5 5\r\n.\r\n'
RETR_RESP = b"""From: postmaster@python.org\
@@ -40,6 +47,8 @@ class DummyPOP3Handler(asynchat.async_chat):
self.set_terminator(b"\r\n")
self.in_buffer = []
self.push('+OK dummy pop3 server ready. <timestamp>')
+ self.tls_active = False
+ self.tls_starting = False
def collect_incoming_data(self, data):
self.in_buffer.append(data)
@@ -114,16 +123,65 @@ class DummyPOP3Handler(asynchat.async_chat):
self.push('+OK closing.')
self.close_when_done()
+ def _get_capas(self):
+ _capas = dict(self.CAPAS)
+ if not self.tls_active and SUPPORTS_SSL:
+ _capas['STLS'] = []
+ return _capas
+
def cmd_capa(self, arg):
self.push('+OK Capability list follows')
- if self.CAPAS:
- for cap, params in self.CAPAS.items():
+ if self._get_capas():
+ for cap, params in self._get_capas().items():
_ln = [cap]
if params:
_ln.extend(params)
self.push(' '.join(_ln))
self.push('.')
+ if SUPPORTS_SSL:
+
+ def cmd_stls(self, arg):
+ if self.tls_active is False:
+ self.push('+OK Begin TLS negotiation')
+ tls_sock = ssl.wrap_socket(self.socket, certfile=CERTFILE,
+ server_side=True,
+ do_handshake_on_connect=False,
+ suppress_ragged_eofs=False)
+ self.del_channel()
+ self.set_socket(tls_sock)
+ self.tls_active = True
+ self.tls_starting = True
+ self.in_buffer = []
+ self._do_tls_handshake()
+ else:
+ self.push('-ERR Command not permitted when TLS active')
+
+ def _do_tls_handshake(self):
+ try:
+ self.socket.do_handshake()
+ except ssl.SSLError as err:
+ if err.args[0] in (ssl.SSL_ERROR_WANT_READ,
+ ssl.SSL_ERROR_WANT_WRITE):
+ return
+ elif err.args[0] == ssl.SSL_ERROR_EOF:
+ return self.handle_close()
+ raise
+ except socket.error as err:
+ if err.args[0] == errno.ECONNABORTED:
+ return self.handle_close()
+ else:
+ self.tls_active = True
+ self.tls_starting = False
+
+ def handle_read(self):
+ if self.tls_starting:
+ self._do_tls_handshake()
+ else:
+ try:
+ asynchat.async_chat.handle_read(self)
+ except ssl.SSLEOFError:
+ self.handle_close()
class DummyPOP3Server(asyncore.dispatcher, threading.Thread):
@@ -254,13 +312,25 @@ class TestPOP3Class(TestCase):
self.assertIsNone(self.client.sock)
self.assertIsNone(self.client.file)
+ if SUPPORTS_SSL:
-SUPPORTS_SSL = False
-if hasattr(poplib, 'POP3_SSL'):
- import ssl
+ def test_stls_capa(self):
+ capa = self.client.capa()
+ self.assertTrue('STLS' in capa.keys())
- SUPPORTS_SSL = True
- CERTFILE = os.path.join(os.path.dirname(__file__) or os.curdir, "keycert.pem")
+ def test_stls(self):
+ expected = b'+OK Begin TLS negotiation'
+ resp = self.client.stls()
+ self.assertEqual(resp, expected)
+
+ def test_stls_context(self):
+ expected = b'+OK Begin TLS negotiation'
+ ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
+ resp = self.client.stls(context=ctx)
+ self.assertEqual(resp, expected)
+
+
+if SUPPORTS_SSL:
class DummyPOP3_SSLHandler(DummyPOP3Handler):
@@ -272,34 +342,13 @@ if hasattr(poplib, 'POP3_SSL'):
self.del_channel()
self.set_socket(ssl_socket)
# Must try handshake before calling push()
- self._ssl_accepting = True
- self._do_ssl_handshake()
+ self.tls_active = True
+ self.tls_starting = True
+ self._do_tls_handshake()
self.set_terminator(b"\r\n")
self.in_buffer = []
self.push('+OK dummy pop3 server ready. <timestamp>')
- def _do_ssl_handshake(self):
- try:
- self.socket.do_handshake()
- except ssl.SSLError as err:
- if err.args[0] in (ssl.SSL_ERROR_WANT_READ,
- ssl.SSL_ERROR_WANT_WRITE):
- return
- elif err.args[0] == ssl.SSL_ERROR_EOF:
- return self.handle_close()
- raise
- except socket.error as err:
- if err.args[0] == errno.ECONNABORTED:
- return self.handle_close()
- else:
- self._ssl_accepting = False
-
- def handle_read(self):
- if self._ssl_accepting:
- self._do_ssl_handshake()
- else:
- DummyPOP3Handler.handle_read(self)
-
class TestPOP3_SSLClass(TestPOP3Class):
# repeat previous tests by using poplib.POP3_SSL
@@ -330,6 +379,39 @@ if hasattr(poplib, 'POP3_SSL'):
self.assertIs(self.client.sock.context, ctx)
self.assertTrue(self.client.noop().startswith(b'+OK'))
+ def test_stls(self):
+ self.assertRaises(poplib.error_proto, self.client.stls)
+
+ test_stls_context = test_stls
+
+ def test_stls_capa(self):
+ capa = self.client.capa()
+ self.assertFalse('STLS' in capa.keys())
+
+
+ class TestPOP3_TLSClass(TestPOP3Class):
+ # repeat previous tests by using poplib.POP3.stls()
+
+ def setUp(self):
+ self.server = DummyPOP3Server((HOST, PORT))
+ self.server.start()
+ self.client = poplib.POP3(self.server.host, self.server.port, timeout=3)
+ self.client.stls()
+
+ def tearDown(self):
+ if self.client.file is not None and self.client.sock is not None:
+ self.client.quit()
+ self.server.stop()
+
+ def test_stls(self):
+ self.assertRaises(poplib.error_proto, self.client.stls)
+
+ test_stls_context = test_stls
+
+ def test_stls_capa(self):
+ capa = self.client.capa()
+ self.assertFalse(b'STLS' in capa.keys())
+
class TestTimeouts(TestCase):
@@ -389,6 +471,7 @@ def test_main():
tests = [TestPOP3Class, TestTimeouts]
if SUPPORTS_SSL:
tests.append(TestPOP3_SSLClass)
+ tests.append(TestPOP3_TLSClass)
thread_info = test_support.threading_setup()
try:
test_support.run_unittest(*tests)