summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorBill Janssen <janssen@parc.com>2008-09-08 16:45:19 (GMT)
committerBill Janssen <janssen@parc.com>2008-09-08 16:45:19 (GMT)
commit58afe4c194b2934beda995840d3f5b3a0aafc3fa (patch)
tree8482f1bfb3f8ed9255aaa8af5e7dfe3d429b2ce7
parentf9ee5b4cffdcd79bc2edac84eaf9887cb2c303fc (diff)
downloadcpython-58afe4c194b2934beda995840d3f5b3a0aafc3fa.zip
cpython-58afe4c194b2934beda995840d3f5b3a0aafc3fa.tar.gz
cpython-58afe4c194b2934beda995840d3f5b3a0aafc3fa.tar.bz2
fixes from issue 3162 for SSL module
-rw-r--r--Lib/ssl.py8
-rw-r--r--Lib/test/test_ssl.py126
2 files changed, 134 insertions, 0 deletions
diff --git a/Lib/ssl.py b/Lib/ssl.py
index aa301295..cd54437 100644
--- a/Lib/ssl.py
+++ b/Lib/ssl.py
@@ -284,6 +284,14 @@ class SSLSocket(socket):
else:
return socket.recvfrom(self, addr, buflen, flags)
+ def recvfrom_into(self, buffer, nbytes=None, flags=0):
+ self._checkClosed()
+ if self._sslobj:
+ raise ValueError("recvfrom_into not allowed on instances of %s" %
+ self.__class__)
+ else:
+ return socket.recvfrom_into(self, buffer, nbytes, flags)
+
def pending(self):
self._checkClosed()
if self._sslobj:
diff --git a/Lib/test/test_ssl.py b/Lib/test/test_ssl.py
index 1ad6f02..9d59a26 100644
--- a/Lib/test/test_ssl.py
+++ b/Lib/test/test_ssl.py
@@ -781,6 +781,9 @@ else:
def testMalformedCert(self):
badCertTest(os.path.join(os.path.dirname(__file__) or os.curdir,
"badcert.pem"))
+ def testWrongCert(self):
+ badCertTest(os.path.join(os.path.dirname(__file__) or os.curdir,
+ "wrongcert.pem"))
def testMalformedKey(self):
badCertTest(os.path.join(os.path.dirname(__file__) or os.curdir,
"badkey.pem"))
@@ -1033,6 +1036,129 @@ else:
server.stop()
server.join()
+ def testAllRecvAndSendMethods(self):
+
+ if support.verbose:
+ sys.stdout.write("\n")
+
+ server = ThreadedEchoServer(CERTFILE,
+ certreqs=ssl.CERT_NONE,
+ ssl_version=ssl.PROTOCOL_TLSv1,
+ cacerts=CERTFILE,
+ chatty=True,
+ connectionchatty=False)
+ flag = threading.Event()
+ server.start(flag)
+ # wait for it to start
+ flag.wait()
+ # try to connect
+ try:
+ s = ssl.wrap_socket(socket.socket(),
+ server_side=False,
+ certfile=CERTFILE,
+ ca_certs=CERTFILE,
+ cert_reqs=ssl.CERT_NONE,
+ ssl_version=ssl.PROTOCOL_TLSv1)
+ s.connect((HOST, server.port))
+ except ssl.SSLError as x:
+ raise support.TestFailed("Unexpected SSL error: " + str(x))
+ except Exception as x:
+ raise support.TestFailed("Unexpected exception: " + str(x))
+ else:
+ # helper methods for standardising recv* method signatures
+ def _recv_into():
+ b = bytearray(b"\0"*100)
+ count = s.recv_into(b)
+ return b[:count]
+
+ def _recvfrom_into():
+ b = bytearray(b"\0"*100)
+ count, addr = s.recvfrom_into(b)
+ return b[:count]
+
+ # (name, method, whether to expect success, *args)
+ send_methods = [
+ ('send', s.send, True, []),
+ ('sendto', s.sendto, False, ["some.address"]),
+ ('sendall', s.sendall, True, []),
+ ]
+ recv_methods = [
+ ('recv', s.recv, True, []),
+ ('recvfrom', s.recvfrom, False, ["some.address"]),
+ ('recv_into', _recv_into, True, []),
+ ('recvfrom_into', _recvfrom_into, False, []),
+ ]
+ data_prefix = "PREFIX_"
+
+ for meth_name, send_meth, expect_success, args in send_methods:
+ indata = data_prefix + meth_name
+ try:
+ send_meth(indata.encode('ASCII', 'strict'), *args)
+ outdata = s.read()
+ outdata = str(outdata, 'ASCII', 'strict')
+ if outdata != indata.lower():
+ raise support.TestFailed(
+ "While sending with <<{name:s}>> bad data "
+ "<<{outdata:s}>> ({nout:d}) received; "
+ "expected <<{indata:s}>> ({nin:d})\n".format(
+ name=meth_name, outdata=repr(outdata[:20]),
+ nout=len(outdata),
+ indata=repr(indata[:20]), nin=len(indata)
+ )
+ )
+ except ValueError as e:
+ if expect_success:
+ raise support.TestFailed(
+ "Failed to send with method <<{name:s}>>; "
+ "expected to succeed.\n".format(name=meth_name)
+ )
+ if not str(e).startswith(meth_name):
+ raise support.TestFailed(
+ "Method <<{name:s}>> failed with unexpected "
+ "exception message: {exp:s}\n".format(
+ name=meth_name, exp=e
+ )
+ )
+
+ for meth_name, recv_meth, expect_success, args in recv_methods:
+ indata = data_prefix + meth_name
+ try:
+ s.send(indata.encode('ASCII', 'strict'))
+ outdata = recv_meth(*args)
+ outdata = str(outdata, 'ASCII', 'strict')
+ if outdata != indata.lower():
+ raise support.TestFailed(
+ "While receiving with <<{name:s}>> bad data "
+ "<<{outdata:s}>> ({nout:d}) received; "
+ "expected <<{indata:s}>> ({nin:d})\n".format(
+ name=meth_name, outdata=repr(outdata[:20]),
+ nout=len(outdata),
+ indata=repr(indata[:20]), nin=len(indata)
+ )
+ )
+ except ValueError as e:
+ if expect_success:
+ raise support.TestFailed(
+ "Failed to receive with method <<{name:s}>>; "
+ "expected to succeed.\n".format(name=meth_name)
+ )
+ if not str(e).startswith(meth_name):
+ raise support.TestFailed(
+ "Method <<{name:s}>> failed with unexpected "
+ "exception message: {exp:s}\n".format(
+ name=meth_name, exp=e
+ )
+ )
+ # consume data
+ s.read()
+
+ s.write("over\n".encode("ASCII", "strict"))
+ s.close()
+ finally:
+ server.stop()
+ server.join()
+
+
def test_main(verbose=False):
if skip_expected:
raise support.TestSkipped("No SSL support")