summaryrefslogtreecommitdiffstats
path: root/Lib
diff options
context:
space:
mode:
authorBill Janssen <janssen@parc.com>2007-09-10 21:51:02 (GMT)
committerBill Janssen <janssen@parc.com>2007-09-10 21:51:02 (GMT)
commit98d19dafd9c9d95338887b9e53c77ec6960918e0 (patch)
tree1a21af6b4c5c2ba186ff8395663bc751fced6a94 /Lib
parenta0c05512ec071b98e8170c8cfe845bee6fc934da (diff)
downloadcpython-98d19dafd9c9d95338887b9e53c77ec6960918e0.zip
cpython-98d19dafd9c9d95338887b9e53c77ec6960918e0.tar.gz
cpython-98d19dafd9c9d95338887b9e53c77ec6960918e0.tar.bz2
More work on SSL support.
* Much expanded test suite: All protocols tested against all other protocols. All protocols tested with all certificate options. Tests for bad key and bad cert. Test of STARTTLS functionality. Test of RAND_* functions. * Fixes for threading/malloc bug. * Issue 1065 fixed: sslsocket class renamed to SSLSocket. sslerror class renamed to SSLError. Function "wrap_socket" now used to wrap an existing socket. * Issue 1583946 finally fixed: Support for subjectAltName added. Subject name now returned as proper DN list of RDNs. * SSLError exported from socket as "sslerror". * RAND_* functions properly exported from ssl.py. * Documentation improved: Example of how to create a self-signed certificate. Better indexing.
Diffstat (limited to 'Lib')
-rw-r--r--Lib/httplib.py4
-rw-r--r--Lib/imaplib.py4
-rw-r--r--Lib/poplib.py2
-rwxr-xr-xLib/smtplib.py4
-rw-r--r--Lib/socket.py4
-rw-r--r--Lib/ssl.py49
-rw-r--r--Lib/test/badcert.pem36
-rw-r--r--Lib/test/badkey.pem40
-rw-r--r--Lib/test/nullcert.pem0
-rwxr-xr-xLib/test/regrtest.py10
-rw-r--r--Lib/test/test_socket_ssl.py2
-rw-r--r--Lib/test/test_ssl.py747
12 files changed, 668 insertions, 234 deletions
diff --git a/Lib/httplib.py b/Lib/httplib.py
index b926082..8dbe8a0 100644
--- a/Lib/httplib.py
+++ b/Lib/httplib.py
@@ -1051,7 +1051,7 @@ else:
"Connect to a host on a given (SSL) port."
sock = socket.create_connection((self.host, self.port), self.timeout)
- self.sock = ssl.sslsocket(sock, self.key_file, self.cert_file)
+ self.sock = ssl.wrap_socket(sock, self.key_file, self.cert_file)
__all__.append("HTTPSConnection")
@@ -1083,7 +1083,7 @@ else:
def FakeSocket (sock, sslobj):
warnings.warn("FakeSocket is deprecated, and won't be in 3.x. " +
- "Use the result of ssl.sslsocket directly instead.",
+ "Use the result of ssl.wrap_socket() directly instead.",
DeprecationWarning, stacklevel=2)
return sslobj
diff --git a/Lib/imaplib.py b/Lib/imaplib.py
index 5a7adf4..c05abb4 100644
--- a/Lib/imaplib.py
+++ b/Lib/imaplib.py
@@ -1147,7 +1147,7 @@ else:
self.port = port
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.connect((host, port))
- self.sslobj = ssl.sslsocket(self.sock, self.keyfile, self.certfile)
+ self.sslobj = ssl.wrap_socket(self.sock, self.keyfile, self.certfile)
def read(self, size):
@@ -1199,7 +1199,7 @@ else:
def ssl(self):
"""Return SSLObject instance used to communicate with the IMAP4 server.
- ssl = ssl.sslsocket(<instance>.socket)
+ ssl = ssl.wrap_socket(<instance>.socket)
"""
return self.sslobj
diff --git a/Lib/poplib.py b/Lib/poplib.py
index c421529..149675a 100644
--- a/Lib/poplib.py
+++ b/Lib/poplib.py
@@ -348,7 +348,7 @@ else:
if not self.sock:
raise socket.error, msg
self.file = self.sock.makefile('rb')
- self.sslobj = ssl.sslsocket(self.sock, self.keyfile, self.certfile)
+ self.sslobj = ssl.wrap_socket(self.sock, self.keyfile, self.certfile)
self._debugging = 0
self.welcome = self._getresp()
diff --git a/Lib/smtplib.py b/Lib/smtplib.py
index 5299848..5604241 100755
--- a/Lib/smtplib.py
+++ b/Lib/smtplib.py
@@ -587,7 +587,7 @@ class SMTP:
if resp == 220:
if not _have_ssl:
raise RuntimeError("No SSL support included in this Python")
- self.sock = ssl.sslsocket(self.sock, keyfile, certfile)
+ self.sock = ssl.wrap_socket(self.sock, keyfile, certfile)
self.file = SSLFakeFile(self.sock)
return (resp, reply)
@@ -720,7 +720,7 @@ if _have_ssl:
def _get_socket(self, host, port, timeout):
if self.debuglevel > 0: print>>stderr, 'connect:', (host, port)
self.sock = socket.create_connection((host, port), timeout)
- self.sock = ssl.sslsocket(self.sock, self.keyfile, self.certfile)
+ self.sock = ssl.wrap_socket(self.sock, self.keyfile, self.certfile)
self.file = SSLFakeFile(self.sock)
__all__.append("SMTP_SSL")
diff --git a/Lib/socket.py b/Lib/socket.py
index 313151c..30be5c5 100644
--- a/Lib/socket.py
+++ b/Lib/socket.py
@@ -56,13 +56,13 @@ else:
# we do an internal import here because the ssl
# module imports the socket module
import ssl as _realssl
- warnings.warn("socket.ssl() is deprecated. Use ssl.sslsocket() instead.",
+ warnings.warn("socket.ssl() is deprecated. Use ssl.wrap_socket() instead.",
DeprecationWarning, stacklevel=2)
return _realssl.sslwrap_simple(sock, keyfile, certfile)
# we need to import the same constants we used to...
+ from _ssl import SSLError as sslerror
from _ssl import \
- sslerror, \
RAND_add, \
RAND_egd, \
RAND_status, \
diff --git a/Lib/ssl.py b/Lib/ssl.py
index 5ad9447..b987ec3 100644
--- a/Lib/ssl.py
+++ b/Lib/ssl.py
@@ -6,11 +6,11 @@ This module provides some more Pythonic support for SSL.
Object types:
- sslsocket -- subtype of socket.socket which does SSL over the socket
+ SSLSocket -- subtype of socket.socket which does SSL over the socket
Exceptions:
- sslerror -- exception raised for I/O errors
+ SSLError -- exception raised for I/O errors
Functions:
@@ -58,9 +58,11 @@ PROTOCOL_TLSv1
import os, sys
import _ssl # if we can't import it, let the error propagate
-from _ssl import sslerror
+
+from _ssl import SSLError
from _ssl import CERT_NONE, CERT_OPTIONAL, CERT_REQUIRED
from _ssl import PROTOCOL_SSLv2, PROTOCOL_SSLv3, PROTOCOL_SSLv23, PROTOCOL_TLSv1
+from _ssl import RAND_status, RAND_egd, RAND_add
from _ssl import \
SSL_ERROR_ZERO_RETURN, \
SSL_ERROR_WANT_READ, \
@@ -75,8 +77,20 @@ from _ssl import \
from socket import socket
from socket import getnameinfo as _getnameinfo
+def get_protocol_name (protocol_code):
+ if protocol_code == PROTOCOL_TLSv1:
+ return "TLSv1"
+ elif protocol_code == PROTOCOL_SSLv23:
+ return "SSLv23"
+ elif protocol_code == PROTOCOL_SSLv2:
+ return "SSLv2"
+ elif protocol_code == PROTOCOL_SSLv3:
+ return "SSLv3"
+ else:
+ return "<unknown>"
+
-class sslsocket (socket):
+class SSLSocket (socket):
"""This class implements a subtype of socket.socket that wraps
the underlying OS socket in an SSL context when necessary, and
@@ -119,14 +133,21 @@ class sslsocket (socket):
return self._sslobj.write(data)
- def getpeercert(self):
+ def getpeercert(self, binary_form=False):
"""Returns a formatted version of the data in the
certificate provided by the other end of the SSL channel.
Return None if no certificate was provided, {} if a
certificate was provided, but not validated."""
- return self._sslobj.peer_certificate()
+ return self._sslobj.peer_certificate(binary_form)
+
+ def cipher (self):
+
+ if not self._sslobj:
+ return None
+ else:
+ return self._sslobj.cipher()
def send (self, data, flags=0):
if self._sslobj:
@@ -197,7 +218,7 @@ class sslsocket (socket):
# Here we assume that the socket is client-side, and not
# connected at the time of the call. We connect it, then wrap it.
if self._sslobj:
- raise ValueError("attempt to connect already-connected sslsocket!")
+ raise ValueError("attempt to connect already-connected SSLSocket!")
socket.connect(self, addr)
self._sslobj = _ssl.sslwrap(self._sock, False, self.keyfile, self.certfile,
self.cert_reqs, self.ssl_version,
@@ -210,10 +231,18 @@ class sslsocket (socket):
SSL channel, and the address of the remote client."""
newsock, addr = socket.accept(self)
- return (sslsocket(newsock, True, self.keyfile, self.certfile,
- self.cert_reqs, self.ssl_version,
- self.ca_certs), addr)
+ return (SSLSocket(newsock, True, self.keyfile, self.certfile,
+ self.cert_reqs, self.ssl_version,
+ self.ca_certs), addr)
+
+
+def wrap_socket(sock, keyfile=None, certfile=None,
+ server_side=False, cert_reqs=CERT_NONE,
+ ssl_version=PROTOCOL_SSLv23, ca_certs=None):
+ return SSLSocket(sock, keyfile=keyfile, certfile=certfile,
+ server_side=server_side, cert_reqs=cert_reqs,
+ ssl_version=ssl_version, ca_certs=ca_certs)
# some utility functions
diff --git a/Lib/test/badcert.pem b/Lib/test/badcert.pem
new file mode 100644
index 0000000..c419146
--- /dev/null
+++ b/Lib/test/badcert.pem
@@ -0,0 +1,36 @@
+-----BEGIN RSA PRIVATE KEY-----
+MIICXwIBAAKBgQC8ddrhm+LutBvjYcQlnH21PPIseJ1JVG2HMmN2CmZk2YukO+9L
+opdJhTvbGfEj0DQs1IE8M+kTUyOmuKfVrFMKwtVeCJphrAnhoz7TYOuLBSqt7lVH
+fhi/VwovESJlaBOp+WMnfhcduPEYHYx/6cnVapIkZnLt30zu2um+DzA9jQIDAQAB
+AoGBAK0FZpaKj6WnJZN0RqhhK+ggtBWwBnc0U/ozgKz2j1s3fsShYeiGtW6CK5nU
+D1dZ5wzhbGThI7LiOXDvRucc9n7vUgi0alqPQ/PFodPxAN/eEYkmXQ7W2k7zwsDA
+IUK0KUhktQbLu8qF/m8qM86ba9y9/9YkXuQbZ3COl5ahTZrhAkEA301P08RKv3KM
+oXnGU2UHTuJ1MAD2hOrPxjD4/wxA/39EWG9bZczbJyggB4RHu0I3NOSFjAm3HQm0
+ANOu5QK9owJBANgOeLfNNcF4pp+UikRFqxk5hULqRAWzVxVrWe85FlPm0VVmHbb/
+loif7mqjU8o1jTd/LM7RD9f2usZyE2psaw8CQQCNLhkpX3KO5kKJmS9N7JMZSc4j
+oog58yeYO8BBqKKzpug0LXuQultYv2K4veaIO04iL9VLe5z9S/Q1jaCHBBuXAkEA
+z8gjGoi1AOp6PBBLZNsncCvcV/0aC+1se4HxTNo2+duKSDnbq+ljqOM+E7odU+Nq
+ewvIWOG//e8fssd0mq3HywJBAJ8l/c8GVmrpFTx8r/nZ2Pyyjt3dH1widooDXYSV
+q6Gbf41Llo5sYAtmxdndTLASuHKecacTgZVhy0FryZpLKrU=
+-----END RSA PRIVATE KEY-----
+-----BEGIN CERTIFICATE-----
+Just bad cert data
+-----END CERTIFICATE-----
+-----BEGIN RSA PRIVATE KEY-----
+MIICXwIBAAKBgQC8ddrhm+LutBvjYcQlnH21PPIseJ1JVG2HMmN2CmZk2YukO+9L
+opdJhTvbGfEj0DQs1IE8M+kTUyOmuKfVrFMKwtVeCJphrAnhoz7TYOuLBSqt7lVH
+fhi/VwovESJlaBOp+WMnfhcduPEYHYx/6cnVapIkZnLt30zu2um+DzA9jQIDAQAB
+AoGBAK0FZpaKj6WnJZN0RqhhK+ggtBWwBnc0U/ozgKz2j1s3fsShYeiGtW6CK5nU
+D1dZ5wzhbGThI7LiOXDvRucc9n7vUgi0alqPQ/PFodPxAN/eEYkmXQ7W2k7zwsDA
+IUK0KUhktQbLu8qF/m8qM86ba9y9/9YkXuQbZ3COl5ahTZrhAkEA301P08RKv3KM
+oXnGU2UHTuJ1MAD2hOrPxjD4/wxA/39EWG9bZczbJyggB4RHu0I3NOSFjAm3HQm0
+ANOu5QK9owJBANgOeLfNNcF4pp+UikRFqxk5hULqRAWzVxVrWe85FlPm0VVmHbb/
+loif7mqjU8o1jTd/LM7RD9f2usZyE2psaw8CQQCNLhkpX3KO5kKJmS9N7JMZSc4j
+oog58yeYO8BBqKKzpug0LXuQultYv2K4veaIO04iL9VLe5z9S/Q1jaCHBBuXAkEA
+z8gjGoi1AOp6PBBLZNsncCvcV/0aC+1se4HxTNo2+duKSDnbq+ljqOM+E7odU+Nq
+ewvIWOG//e8fssd0mq3HywJBAJ8l/c8GVmrpFTx8r/nZ2Pyyjt3dH1widooDXYSV
+q6Gbf41Llo5sYAtmxdndTLASuHKecacTgZVhy0FryZpLKrU=
+-----END RSA PRIVATE KEY-----
+-----BEGIN CERTIFICATE-----
+Just bad cert data
+-----END CERTIFICATE-----
diff --git a/Lib/test/badkey.pem b/Lib/test/badkey.pem
new file mode 100644
index 0000000..1c8a955
--- /dev/null
+++ b/Lib/test/badkey.pem
@@ -0,0 +1,40 @@
+-----BEGIN RSA PRIVATE KEY-----
+Bad Key, though the cert should be OK
+-----END RSA PRIVATE KEY-----
+-----BEGIN CERTIFICATE-----
+MIICpzCCAhCgAwIBAgIJAP+qStv1cIGNMA0GCSqGSIb3DQEBBQUAMIGJMQswCQYD
+VQQGEwJVUzERMA8GA1UECBMIRGVsYXdhcmUxEzARBgNVBAcTCldpbG1pbmd0b24x
+IzAhBgNVBAoTGlB5dGhvbiBTb2Z0d2FyZSBGb3VuZGF0aW9uMQwwCgYDVQQLEwNT
+U0wxHzAdBgNVBAMTFnNvbWVtYWNoaW5lLnB5dGhvbi5vcmcwHhcNMDcwODI3MTY1
+NDUwWhcNMTMwMjE2MTY1NDUwWjCBiTELMAkGA1UEBhMCVVMxETAPBgNVBAgTCERl
+bGF3YXJlMRMwEQYDVQQHEwpXaWxtaW5ndG9uMSMwIQYDVQQKExpQeXRob24gU29m
+dHdhcmUgRm91bmRhdGlvbjEMMAoGA1UECxMDU1NMMR8wHQYDVQQDExZzb21lbWFj
+aGluZS5weXRob24ub3JnMIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQC8ddrh
+m+LutBvjYcQlnH21PPIseJ1JVG2HMmN2CmZk2YukO+9LopdJhTvbGfEj0DQs1IE8
+M+kTUyOmuKfVrFMKwtVeCJphrAnhoz7TYOuLBSqt7lVHfhi/VwovESJlaBOp+WMn
+fhcduPEYHYx/6cnVapIkZnLt30zu2um+DzA9jQIDAQABoxUwEzARBglghkgBhvhC
+AQEEBAMCBkAwDQYJKoZIhvcNAQEFBQADgYEAF4Q5BVqmCOLv1n8je/Jw9K669VXb
+08hyGzQhkemEBYQd6fzQ9A/1ZzHkJKb1P6yreOLSEh4KcxYPyrLRC1ll8nr5OlCx
+CMhKkTnR6qBsdNV0XtdU2+N25hqW+Ma4ZeqsN/iiJVCGNOZGnvQuvCAGWF8+J/f/
+iHkC6gGdBJhogs4=
+-----END CERTIFICATE-----
+-----BEGIN RSA PRIVATE KEY-----
+Bad Key, though the cert should be OK
+-----END RSA PRIVATE KEY-----
+-----BEGIN CERTIFICATE-----
+MIICpzCCAhCgAwIBAgIJAP+qStv1cIGNMA0GCSqGSIb3DQEBBQUAMIGJMQswCQYD
+VQQGEwJVUzERMA8GA1UECBMIRGVsYXdhcmUxEzARBgNVBAcTCldpbG1pbmd0b24x
+IzAhBgNVBAoTGlB5dGhvbiBTb2Z0d2FyZSBGb3VuZGF0aW9uMQwwCgYDVQQLEwNT
+U0wxHzAdBgNVBAMTFnNvbWVtYWNoaW5lLnB5dGhvbi5vcmcwHhcNMDcwODI3MTY1
+NDUwWhcNMTMwMjE2MTY1NDUwWjCBiTELMAkGA1UEBhMCVVMxETAPBgNVBAgTCERl
+bGF3YXJlMRMwEQYDVQQHEwpXaWxtaW5ndG9uMSMwIQYDVQQKExpQeXRob24gU29m
+dHdhcmUgRm91bmRhdGlvbjEMMAoGA1UECxMDU1NMMR8wHQYDVQQDExZzb21lbWFj
+aGluZS5weXRob24ub3JnMIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQC8ddrh
+m+LutBvjYcQlnH21PPIseJ1JVG2HMmN2CmZk2YukO+9LopdJhTvbGfEj0DQs1IE8
+M+kTUyOmuKfVrFMKwtVeCJphrAnhoz7TYOuLBSqt7lVHfhi/VwovESJlaBOp+WMn
+fhcduPEYHYx/6cnVapIkZnLt30zu2um+DzA9jQIDAQABoxUwEzARBglghkgBhvhC
+AQEEBAMCBkAwDQYJKoZIhvcNAQEFBQADgYEAF4Q5BVqmCOLv1n8je/Jw9K669VXb
+08hyGzQhkemEBYQd6fzQ9A/1ZzHkJKb1P6yreOLSEh4KcxYPyrLRC1ll8nr5OlCx
+CMhKkTnR6qBsdNV0XtdU2+N25hqW+Ma4ZeqsN/iiJVCGNOZGnvQuvCAGWF8+J/f/
+iHkC6gGdBJhogs4=
+-----END CERTIFICATE-----
diff --git a/Lib/test/nullcert.pem b/Lib/test/nullcert.pem
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/Lib/test/nullcert.pem
diff --git a/Lib/test/regrtest.py b/Lib/test/regrtest.py
index 99803ff..7db63cb 100755
--- a/Lib/test/regrtest.py
+++ b/Lib/test/regrtest.py
@@ -1108,7 +1108,6 @@ _expectations['freebsd7'] = _expectations['freebsd4']
class _ExpectedSkips:
def __init__(self):
import os.path
- from test import test_socket_ssl
from test import test_timeout
self.valid = False
@@ -1122,8 +1121,13 @@ class _ExpectedSkips:
if not os.path.supports_unicode_filenames:
self.expected.add('test_pep277')
- if test_socket_ssl.skip_expected:
- self.expected.add('test_socket_ssl')
+ try:
+ from test import test_socket_ssl
+ except ImportError:
+ pass
+ else:
+ if test_socket_ssl.skip_expected:
+ self.expected.add('test_socket_ssl')
if test_timeout.skip_expected:
self.expected.add('test_timeout')
diff --git a/Lib/test/test_socket_ssl.py b/Lib/test/test_socket_ssl.py
index f7c1aa1..d6a54bb 100644
--- a/Lib/test/test_socket_ssl.py
+++ b/Lib/test/test_socket_ssl.py
@@ -115,7 +115,7 @@ class BasicTests(unittest.TestCase):
s = socket.socket(socket.AF_INET)
s.connect(("www.sf.net", 443))
fd = s._sock.fileno()
- sock = ssl.sslsocket(s)
+ sock = ssl.wrap_socket(s)
s = None
sock.close()
try:
diff --git a/Lib/test/test_ssl.py b/Lib/test/test_ssl.py
index 04daab2..7eedbdc 100644
--- a/Lib/test/test_ssl.py
+++ b/Lib/test/test_ssl.py
@@ -5,7 +5,6 @@ import unittest
from test import test_support
import socket
import errno
-import threading
import subprocess
import time
import os
@@ -23,57 +22,21 @@ except ImportError:
CERTFILE = None
+TESTPORT = 10025
def handle_error(prefix):
exc_format = ' '.join(traceback.format_exception(*sys.exc_info()))
- sys.stdout.write(prefix + exc_format)
+ if test_support.verbose:
+ sys.stdout.write(prefix + exc_format)
class BasicTests(unittest.TestCase):
- def testRudeShutdown(self):
- # Some random port to connect to.
- PORT = [9934]
-
- listener_ready = threading.Event()
- listener_gone = threading.Event()
-
- # `listener` runs in a thread. It opens a socket listening on
- # PORT, and sits in an accept() until the main thread connects.
- # Then it rudely closes the socket, and sets Event `listener_gone`
- # to let the main thread know the socket is gone.
- def listener():
- s = socket.socket()
- PORT[0] = test_support.bind_port(s, '', PORT[0])
- s.listen(5)
- listener_ready.set()
- s.accept()
- s = None # reclaim the socket object, which also closes it
- listener_gone.set()
-
- def connector():
- listener_ready.wait()
- s = socket.socket()
- s.connect(('localhost', PORT[0]))
- listener_gone.wait()
- try:
- ssl_sock = socket.ssl(s)
- except socket.sslerror:
- pass
- else:
- raise test_support.TestFailed(
- 'connecting to closed SSL socket should have failed')
-
- t = threading.Thread(target=listener)
- t.start()
- connector()
- t.join()
-
def testSSLconnect(self):
import os
with test_support.transient_internet():
- s = ssl.sslsocket(socket.socket(socket.AF_INET),
- cert_reqs=ssl.CERT_NONE)
+ s = ssl.wrap_socket(socket.socket(socket.AF_INET),
+ cert_reqs=ssl.CERT_NONE)
s.connect(("pop.gmail.com", 995))
c = s.getpeercert()
if c:
@@ -81,177 +44,551 @@ class BasicTests(unittest.TestCase):
s.close()
# this should fail because we have no verification certs
- s = ssl.sslsocket(socket.socket(socket.AF_INET),
- cert_reqs=ssl.CERT_REQUIRED)
+ s = ssl.wrap_socket(socket.socket(socket.AF_INET),
+ cert_reqs=ssl.CERT_REQUIRED)
try:
s.connect(("pop.gmail.com", 995))
- except ssl.sslerror:
+ except ssl.SSLError:
pass
finally:
s.close()
-class ConnectedTests(unittest.TestCase):
-
- def testTLSecho (self):
-
- s1 = socket.socket()
+ def testCrucialConstants(self):
+ ssl.PROTOCOL_SSLv2
+ ssl.PROTOCOL_SSLv23
+ ssl.PROTOCOL_SSLv3
+ ssl.PROTOCOL_TLSv1
+ ssl.CERT_NONE
+ ssl.CERT_OPTIONAL
+ ssl.CERT_REQUIRED
+
+ def testRAND(self):
+ v = ssl.RAND_status()
+ if test_support.verbose:
+ sys.stdout.write("\n RAND_status is %d (%s)\n"
+ % (v, (v and "sufficient randomness") or
+ "insufficient randomness"))
try:
- s1.connect(('127.0.0.1', 10024))
- except:
- handle_error("connection failure:\n")
- raise test_support.TestFailed("Can't connect to test server")
+ ssl.RAND_egd(1)
+ except TypeError:
+ pass
else:
- try:
- c1 = ssl.sslsocket(s1, ssl_version=ssl.PROTOCOL_TLSv1)
- except:
- handle_error("SSL handshake failure:\n")
- raise test_support.TestFailed("Can't SSL-handshake with test server")
- else:
- if not c1:
- raise test_support.TestFailed("Can't SSL-handshake with test server")
- indata = "FOO\n"
- c1.write(indata)
- outdata = c1.read()
- if outdata != indata.lower():
- raise test_support.TestFailed("bad data <<%s>> received; expected <<%s>>\n" % (data, indata.lower()))
- c1.close()
-
- def testReadCert(self):
+ print "didn't raise TypeError"
+ ssl.RAND_add("this is a random string", 75.0)
+
+ def testParseCert(self):
+ # note that this uses an 'unofficial' function in _ssl.c,
+ # provided solely for this test, to exercise the certificate
+ # parsing code
+ p = ssl._ssl._test_decode_cert(CERTFILE, False)
+ if test_support.verbose:
+ sys.stdout.write("\n" + pprint.pformat(p) + "\n")
- s2 = socket.socket()
- try:
- s2.connect(('127.0.0.1', 10024))
- except:
- handle_error("connection failure:\n")
- raise test_support.TestFailed("Can't connect to test server")
- else:
- try:
- c2 = ssl.sslsocket(s2, ssl_version=ssl.PROTOCOL_TLSv1,
- cert_reqs=ssl.CERT_REQUIRED, ca_certs=CERTFILE)
- except:
- handle_error("SSL handshake failure:\n")
- raise test_support.TestFailed("Can't SSL-handshake with test server")
- else:
- if not c2:
- raise test_support.TestFailed("Can't SSL-handshake with test server")
- cert = c2.getpeercert()
- if not cert:
- raise test_support.TestFailed("Can't get peer certificate.")
- if test_support.verbose:
- sys.stdout.write(pprint.pformat(cert) + '\n')
- if not cert.has_key('subject'):
- raise test_support.TestFailed(
- "No subject field in certificate: %s." %
- pprint.pformat(cert))
- if not ('organizationName', 'Python Software Foundation') in cert['subject']:
- raise test_support.TestFailed(
- "Missing or invalid 'organizationName' field in certificate subject; "
- "should be 'Python Software Foundation'.");
- c2.close()
+try:
+ import threading
+except ImportError:
+ _have_threads = False
+else:
+ _have_threads = True
-class ThreadedEchoServer(threading.Thread):
+ class ThreadedEchoServer(threading.Thread):
- class ConnectionHandler(threading.Thread):
+ class ConnectionHandler(threading.Thread):
- def __init__(self, server, connsock):
- self.server = server
- self.running = False
- self.sock = connsock
- threading.Thread.__init__(self)
- self.setDaemon(True)
+ """A mildly complicated class, because we want it to work both
+ with and without the SSL wrapper around the socket connection, so
+ that we can test the STARTTLS functionality."""
- def run (self):
- self.running = True
- try:
- sslconn = ssl.sslsocket(self.sock, server_side=True,
- certfile=self.server.certificate,
- ssl_version=self.server.protocol,
- cert_reqs=self.server.certreqs)
- except:
- # here, we want to stop the server, because this shouldn't
- # happen in the context of our test case
- handle_error("Test server failure:\n")
+ def __init__(self, server, connsock):
+ self.server = server
self.running = False
- # normally, we'd just stop here, but for the test
- # harness, we want to stop the server
- self.server.stop()
- return
+ self.sock = connsock
+ self.sock.setblocking(1)
+ self.sslconn = None
+ threading.Thread.__init__(self)
+ self.setDaemon(True)
- while self.running:
+ def wrap_conn (self):
try:
- msg = sslconn.read()
- if not msg:
- # eof, so quit this handler
+ self.sslconn = ssl.wrap_socket(self.sock, server_side=True,
+ certfile=self.server.certificate,
+ ssl_version=self.server.protocol,
+ ca_certs=self.server.cacerts,
+ cert_reqs=self.server.certreqs)
+ except:
+ if self.server.chatty:
+ handle_error("\n server: bad connection attempt from " +
+ str(self.sock.getpeername()) + ":\n")
+ if not self.server.expect_bad_connects:
+ # here, we want to stop the server, because this shouldn't
+ # happen in the context of our test case
self.running = False
- sslconn.close()
- elif msg.strip() == 'over':
- sslconn.close()
+ # normally, we'd just stop here, but for the test
+ # harness, we want to stop the server
self.server.stop()
+ return False
+
+ else:
+ if self.server.certreqs == ssl.CERT_REQUIRED:
+ cert = self.sslconn.getpeercert()
+ if test_support.verbose and self.server.chatty:
+ sys.stdout.write(" client cert is " + pprint.pformat(cert) + "\n")
+ cert_binary = self.sslconn.getpeercert(True)
+ if test_support.verbose and self.server.chatty:
+ sys.stdout.write(" cert binary is " + str(len(cert_binary)) + " bytes\n")
+ cipher = self.sslconn.cipher()
+ if test_support.verbose and self.server.chatty:
+ sys.stdout.write(" server: connection cipher is now " + str(cipher) + "\n")
+ return True
+
+ def read(self):
+ if self.sslconn:
+ return self.sslconn.read()
+ else:
+ return self.sock.recv(1024)
+
+ def write(self, bytes):
+ if self.sslconn:
+ return self.sslconn.write(bytes)
+ else:
+ return self.sock.send(bytes)
+
+ def close(self):
+ if self.sslconn:
+ self.sslconn.close()
+ else:
+ self.sock.close()
+
+ def run (self):
+ self.running = True
+ if not self.server.starttls_server:
+ if not self.wrap_conn():
+ return
+ while self.running:
+ try:
+ msg = self.read()
+ if not msg:
+ # eof, so quit this handler
+ self.running = False
+ self.close()
+ elif msg.strip() == 'over':
+ if test_support.verbose and self.server.connectionchatty:
+ sys.stdout.write(" server: client closed connection\n")
+ self.close()
+ return
+ elif self.server.starttls_server and msg.strip() == 'STARTTLS':
+ if test_support.verbose and self.server.connectionchatty:
+ sys.stdout.write(" server: read STARTTLS from client, sending OK...\n")
+ self.write("OK\n")
+ if not self.wrap_conn():
+ return
+ else:
+ if (test_support.verbose and
+ self.server.connectionchatty):
+ ctype = (self.sslconn and "encrypted") or "unencrypted"
+ sys.stdout.write(" server: read %s (%s), sending back %s (%s)...\n"
+ % (repr(msg), ctype, repr(msg.lower()), ctype))
+ self.write(msg.lower())
+ except ssl.SSLError:
+ if self.server.chatty:
+ handle_error("Test server failure:\n")
+ self.close()
self.running = False
- else:
- if test_support.verbose:
- sys.stdout.write("\nserver: %s\n" % msg.strip().lower())
- sslconn.write(msg.lower())
- except ssl.sslerror:
- handle_error("Test server failure:\n")
- sslconn.close()
- self.running = False
- # normally, we'd just stop here, but for the test
- # harness, we want to stop the server
- self.server.stop()
+ # normally, we'd just stop here, but for the test
+ # harness, we want to stop the server
+ self.server.stop()
+ except:
+ handle_error('')
+
+ def __init__(self, port, certificate, ssl_version=None,
+ certreqs=None, cacerts=None, expect_bad_connects=False,
+ chatty=True, connectionchatty=False, starttls_server=False):
+ if ssl_version is None:
+ ssl_version = ssl.PROTOCOL_TLSv1
+ if certreqs is None:
+ certreqs = ssl.CERT_NONE
+ self.certificate = certificate
+ self.protocol = ssl_version
+ self.certreqs = certreqs
+ self.cacerts = cacerts
+ self.expect_bad_connects = expect_bad_connects
+ self.chatty = chatty
+ self.connectionchatty = connectionchatty
+ self.starttls_server = starttls_server
+ self.sock = socket.socket()
+ self.flag = None
+ if hasattr(socket, 'SO_REUSEADDR'):
+ self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+ if hasattr(socket, 'SO_REUSEPORT'):
+ self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
+ self.sock.bind(('127.0.0.1', port))
+ self.active = False
+ threading.Thread.__init__(self)
+ self.setDaemon(False)
+
+ def start (self, flag=None):
+ self.flag = flag
+ threading.Thread.start(self)
+
+ def run (self):
+ self.sock.settimeout(0.5)
+ self.sock.listen(5)
+ self.active = True
+ if self.flag:
+ # signal an event
+ self.flag.set()
+ while self.active:
+ try:
+ newconn, connaddr = self.sock.accept()
+ if test_support.verbose and self.chatty:
+ sys.stdout.write(' server: new connection from '
+ + str(connaddr) + '\n')
+ handler = self.ConnectionHandler(self, newconn)
+ handler.start()
+ except socket.timeout:
+ pass
+ except KeyboardInterrupt:
+ self.stop()
except:
- handle_error('')
-
- def __init__(self, port, certificate, ssl_version=None,
- certreqs=None, cacerts=None):
- if ssl_version is None:
- ssl_version = ssl.PROTOCOL_TLSv1
- if certreqs is None:
- certreqs = ssl.CERT_NONE
- self.certificate = certificate
- self.protocol = ssl_version
- self.certreqs = certreqs
- self.cacerts = cacerts
- self.sock = socket.socket()
- self.flag = None
- if hasattr(socket, 'SO_REUSEADDR'):
- self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- if hasattr(socket, 'SO_REUSEPORT'):
- self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
- self.sock.bind(('127.0.0.1', port))
- self.active = False
- threading.Thread.__init__(self)
- self.setDaemon(False)
-
- def start (self, flag=None):
- self.flag = flag
- threading.Thread.start(self)
-
- def run (self):
- self.sock.settimeout(0.5)
- self.sock.listen(5)
- self.active = True
- if self.flag:
- # signal an event
- self.flag.set()
- while self.active:
+ if self.chatty:
+ handle_error("Test server failure:\n")
+
+ def stop (self):
+ self.active = False
+ self.sock.close()
+
+ def badCertTest (certfile):
+ server = ThreadedEchoServer(TESTPORT, CERTFILE,
+ certreqs=ssl.CERT_REQUIRED,
+ cacerts=CERTFILE, chatty=False)
+ flag = threading.Event()
+ server.start(flag)
+ # wait for it to start
+ flag.wait()
+ # try to connect
+ try:
try:
- newconn, connaddr = self.sock.accept()
+ s = ssl.wrap_socket(socket.socket(),
+ certfile=certfile,
+ ssl_version=ssl.PROTOCOL_TLSv1)
+ s.connect(('127.0.0.1', TESTPORT))
+ except ssl.SSLError, x:
if test_support.verbose:
- sys.stdout.write('\nserver: new connection from ' + str(connaddr) + '\n')
- handler = self.ConnectionHandler(self, newconn)
- handler.start()
- except socket.timeout:
- pass
- except KeyboardInterrupt:
- self.stop()
- except:
- handle_error("Test server failure:\n")
+ sys.stdout.write("\nSSLError is %s\n" % x[1])
+ else:
+ raise test_support.TestFailed(
+ "Use of invalid cert should have failed!")
+ finally:
+ server.stop()
+ server.join()
+
+ def serverParamsTest (certfile, protocol, certreqs, cacertsfile,
+ client_certfile, client_protocol=None, indata="FOO\n",
+ chatty=True, connectionchatty=False):
+
+ server = ThreadedEchoServer(TESTPORT, certfile,
+ certreqs=certreqs,
+ ssl_version=protocol,
+ cacerts=cacertsfile,
+ chatty=chatty,
+ connectionchatty=connectionchatty)
+ flag = threading.Event()
+ server.start(flag)
+ # wait for it to start
+ flag.wait()
+ # try to connect
+ if client_protocol is None:
+ client_protocol = protocol
+ try:
+ try:
+ s = ssl.wrap_socket(socket.socket(),
+ certfile=client_certfile,
+ ca_certs=cacertsfile,
+ cert_reqs=certreqs,
+ ssl_version=client_protocol)
+ s.connect(('127.0.0.1', TESTPORT))
+ except ssl.SSLError, x:
+ raise test_support.TestFailed("Unexpected SSL error: " + str(x))
+ except Exception, x:
+ raise test_support.TestFailed("Unexpected exception: " + str(x))
+ else:
+ if connectionchatty:
+ if test_support.verbose:
+ sys.stdout.write(
+ " client: sending %s...\n" % (repr(indata)))
+ s.write(indata)
+ outdata = s.read()
+ if connectionchatty:
+ if test_support.verbose:
+ sys.stdout.write(" client: read %s\n" % repr(outdata))
+ if outdata != indata.lower():
+ raise test_support.TestFailed(
+ "bad data <<%s>> (%d) received; expected <<%s>> (%d)\n"
+ % (outdata[:min(len(outdata),20)], len(outdata),
+ indata[:min(len(indata),20)].lower(), len(indata)))
+ s.write("over\n")
+ if connectionchatty:
+ if test_support.verbose:
+ sys.stdout.write(" client: closing connection.\n")
+ s.ssl_shutdown()
+ s.close()
+ finally:
+ server.stop()
+ server.join()
+
+ def tryProtocolCombo (server_protocol,
+ client_protocol,
+ expectedToWork,
+ certsreqs=ssl.CERT_NONE):
+
+ if certsreqs == ssl.CERT_NONE:
+ certtype = "CERT_NONE"
+ elif certsreqs == ssl.CERT_OPTIONAL:
+ certtype = "CERT_OPTIONAL"
+ elif certsreqs == ssl.CERT_REQUIRED:
+ certtype = "CERT_REQUIRED"
+ if test_support.verbose:
+ formatstr = (expectedToWork and " %s->%s %s\n") or " {%s->%s} %s\n"
+ sys.stdout.write(formatstr %
+ (ssl.get_protocol_name(client_protocol),
+ ssl.get_protocol_name(server_protocol),
+ certtype))
+ try:
+ serverParamsTest(CERTFILE, server_protocol, certsreqs,
+ CERTFILE, CERTFILE, client_protocol, chatty=False)
+ except test_support.TestFailed:
+ if expectedToWork:
+ raise
+ else:
+ if not expectedToWork:
+ raise test_support.TestFailed(
+ "Client protocol %s succeeded with server protocol %s!"
+ % (ssl.get_protocol_name(client_protocol),
+ ssl.get_protocol_name(server_protocol)))
+
+
+ class ConnectedTests(unittest.TestCase):
+
+ def testRudeShutdown(self):
+
+ listener_ready = threading.Event()
+ listener_gone = threading.Event()
+
+ # `listener` runs in a thread. It opens a socket listening on
+ # PORT, and sits in an accept() until the main thread connects.
+ # Then it rudely closes the socket, and sets Event `listener_gone`
+ # to let the main thread know the socket is gone.
+ def listener():
+ s = socket.socket()
+ if hasattr(socket, 'SO_REUSEPORT'):
+ s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
+ port = test_support.bind_port(s, 'localhost', TESTPORT)
+ s.listen(5)
+ listener_ready.set()
+ s.accept()
+ s = None # reclaim the socket object, which also closes it
+ listener_gone.set()
+
+ def connector():
+ listener_ready.wait()
+ s = socket.socket()
+ s.connect(('localhost', TESTPORT))
+ listener_gone.wait()
+ try:
+ ssl_sock = ssl.wrap_socket(s)
+ except socket.sslerror:
+ pass
+ else:
+ raise test_support.TestFailed(
+ 'connecting to closed SSL socket should have failed')
+
+ t = threading.Thread(target=listener)
+ t.start()
+ connector()
+ t.join()
+
+ def testEcho (self):
+
+ if test_support.verbose:
+ sys.stdout.write("\n")
+ serverParamsTest(CERTFILE, ssl.PROTOCOL_TLSv1, ssl.CERT_NONE,
+ CERTFILE, CERTFILE, ssl.PROTOCOL_TLSv1,
+ chatty=True, connectionchatty=True)
+
+ def testReadCert(self):
+
+ if test_support.verbose:
+ sys.stdout.write("\n")
+ s2 = socket.socket()
+ server = ThreadedEchoServer(TESTPORT, CERTFILE,
+ certreqs=ssl.CERT_NONE,
+ ssl_version=ssl.PROTOCOL_SSLv23,
+ cacerts=CERTFILE,
+ chatty=False)
+ flag = threading.Event()
+ server.start(flag)
+ # wait for it to start
+ flag.wait()
+ # try to connect
+ try:
+ try:
+ s = ssl.wrap_socket(socket.socket(),
+ certfile=CERTFILE,
+ ca_certs=CERTFILE,
+ cert_reqs=ssl.CERT_REQUIRED,
+ ssl_version=ssl.PROTOCOL_SSLv23)
+ s.connect(('127.0.0.1', TESTPORT))
+ except ssl.SSLError, x:
+ raise test_support.TestFailed(
+ "Unexpected SSL error: " + str(x))
+ except Exception, x:
+ raise test_support.TestFailed(
+ "Unexpected exception: " + str(x))
+ else:
+ if not s:
+ raise test_support.TestFailed(
+ "Can't SSL-handshake with test server")
+ cert = s.getpeercert()
+ if not cert:
+ raise test_support.TestFailed(
+ "Can't get peer certificate.")
+ cipher = s.cipher()
+ if test_support.verbose:
+ sys.stdout.write(pprint.pformat(cert) + '\n')
+ sys.stdout.write("Connection cipher is " + str(cipher) + '.\n')
+ if not cert.has_key('subject'):
+ raise test_support.TestFailed(
+ "No subject field in certificate: %s." %
+ pprint.pformat(cert))
+ if ((('organizationName', 'Python Software Foundation'),)
+ not in cert['subject']):
+ raise test_support.TestFailed(
+ "Missing or invalid 'organizationName' field in certificate subject; "
+ "should be 'Python Software Foundation'.");
+ s.close()
+ finally:
+ server.stop()
+ server.join()
+
+ def testNULLcert(self):
+ badCertTest(os.path.join(os.path.dirname(__file__) or os.curdir,
+ "nullcert.pem"))
+ def testMalformedCert(self):
+ badCertTest(os.path.join(os.path.dirname(__file__) or os.curdir,
+ "badcert.pem"))
+ def testMalformedKey(self):
+ badCertTest(os.path.join(os.path.dirname(__file__) or os.curdir,
+ "badkey.pem"))
+
+ def testProtocolSSL2(self):
+ if test_support.verbose:
+ sys.stdout.write("\n")
+ tryProtocolCombo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True)
+ tryProtocolCombo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_OPTIONAL)
+ tryProtocolCombo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_REQUIRED)
+ tryProtocolCombo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, True)
+ tryProtocolCombo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv3, False)
+ tryProtocolCombo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLSv1, False)
+
+ def testProtocolSSL23(self):
+ if test_support.verbose:
+ sys.stdout.write("\n")
+ try:
+ tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv2, True)
+ except test_support.TestFailed, x:
+ # this fails on some older versions of OpenSSL (0.9.7l, for instance)
+ if test_support.verbose:
+ sys.stdout.write(
+ " SSL2 client to SSL23 server test unexpectedly failed:\n %s\n"
+ % str(x))
+ tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, True)
+ tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True)
+ tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, True)
+
+ tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, True, ssl.CERT_OPTIONAL)
+ tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True, ssl.CERT_OPTIONAL)
+ tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, True, ssl.CERT_OPTIONAL)
+
+ tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, True, ssl.CERT_REQUIRED)
+ tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True, ssl.CERT_REQUIRED)
+ tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, True, ssl.CERT_REQUIRED)
+
+ def testProtocolSSL3(self):
+ if test_support.verbose:
+ sys.stdout.write("\n")
+ tryProtocolCombo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, True)
+ tryProtocolCombo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, True, ssl.CERT_OPTIONAL)
+ tryProtocolCombo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, True, ssl.CERT_REQUIRED)
+ tryProtocolCombo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv2, False)
+ tryProtocolCombo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv23, False)
+ tryProtocolCombo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLSv1, False)
+
+ def testProtocolTLS1(self):
+ if test_support.verbose:
+ sys.stdout.write("\n")
+ tryProtocolCombo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, True)
+ tryProtocolCombo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, True, ssl.CERT_OPTIONAL)
+ tryProtocolCombo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, True, ssl.CERT_REQUIRED)
+ tryProtocolCombo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv2, False)
+ tryProtocolCombo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv3, False)
+ tryProtocolCombo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv23, False)
+
+ def testSTARTTLS (self):
+
+ msgs = ("msg 1", "MSG 2", "STARTTLS", "MSG 3", "msg 4")
+
+ server = ThreadedEchoServer(TESTPORT, CERTFILE,
+ ssl_version=ssl.PROTOCOL_TLSv1,
+ starttls_server=True,
+ chatty=True,
+ connectionchatty=True)
+ flag = threading.Event()
+ server.start(flag)
+ # wait for it to start
+ flag.wait()
+ # try to connect
+ wrapped = False
+ try:
+ try:
+ s = socket.socket()
+ s.setblocking(1)
+ s.connect(('127.0.0.1', TESTPORT))
+ except Exception, x:
+ raise test_support.TestFailed("Unexpected exception: " + str(x))
+ else:
+ if test_support.verbose:
+ sys.stdout.write("\n")
+ for indata in msgs:
+ if test_support.verbose:
+ sys.stdout.write(" client: sending %s...\n" % repr(indata))
+ if wrapped:
+ conn.write(indata)
+ outdata = conn.read()
+ else:
+ s.send(indata)
+ outdata = s.recv(1024)
+ if indata == "STARTTLS" and outdata.strip().lower().startswith("ok"):
+ if test_support.verbose:
+ sys.stdout.write(" client: read %s from server, starting TLS...\n" % repr(outdata))
+ conn = ssl.wrap_socket(s, ssl_version=ssl.PROTOCOL_TLSv1)
+
+ wrapped = True
+ else:
+ if test_support.verbose:
+ sys.stdout.write(" client: read %s from server\n" % repr(outdata))
+ if test_support.verbose:
+ sys.stdout.write(" client: closing connection.\n")
+ if wrapped:
+ conn.write("over\n")
+ conn.ssl_shutdown()
+ else:
+ s.send("over\n")
+ s.close()
+ finally:
+ server.stop()
+ server.join()
- def stop (self):
- self.active = False
- self.sock.close()
CERTFILE_CONFIG_TEMPLATE = """
# create RSA certs - Server
@@ -337,33 +674,21 @@ def test_main(verbose=False):
global CERTFILE
CERTFILE = os.path.join(os.path.dirname(__file__) or os.curdir,
- "keycert.pem")
- if not CERTFILE:
- sys.__stdout__.write("Skipping test_ssl ConnectedTests; "
- "couldn't create a certificate.\n")
+ "keycert.pem")
+ if (not os.path.exists(CERTFILE)):
+ raise test_support.TestFailed("Can't read certificate files!")
tests = [BasicTests]
- server = None
- if CERTFILE and test_support.is_resource_enabled('network'):
- server = ThreadedEchoServer(10024, CERTFILE)
- flag = threading.Event()
- server.start(flag)
- # wait for it to start
- flag.wait()
- tests.append(ConnectedTests)
-
- thread_info = test_support.threading_setup()
+ if _have_threads:
+ thread_info = test_support.threading_setup()
+ if CERTFILE and thread_info and test_support.is_resource_enabled('network'):
+ tests.append(ConnectedTests)
- try:
- test_support.run_unittest(*tests)
- finally:
- if server is not None and server.active:
- server.stop()
- # wait for it to stop
- server.join()
+ test_support.run_unittest(*tests)
- test_support.threading_cleanup(*thread_info)
+ if _have_threads:
+ test_support.threading_cleanup(*thread_info)
if __name__ == "__main__":
test_main()