path: root/Lib
diff options
authorChristian Heimes <>2016-09-05 21:54:41 (GMT)
committerChristian Heimes <>2016-09-05 21:54:41 (GMT)
commitdffa3949c7431e819c64a890bce41fe769e47da1 (patch)
tree76a4293c96ba8a839d90cda3e74e6dd91cce9274 /Lib
parent92a6c170e6897ee98c36a3a9087b1a7d3e054d2b (diff)
Issue #27744: Add AF_ALG (Linux Kernel crypto) to socket module.
Diffstat (limited to 'Lib')
1 files changed, 165 insertions, 0 deletions
diff --git a/Lib/test/ b/Lib/test/
index 7fce13e..6fc6e27 100644
--- a/Lib/test/
+++ b/Lib/test/
@@ -5325,6 +5325,170 @@ class SendfileUsingSendfileTest(SendfileUsingSendTest):
def meth_from_sock(self, sock):
return getattr(sock, "_sendfile_use_sendfile")
+@unittest.skipUnless(hasattr(socket, "AF_ALG"), 'AF_ALG required')
+class LinuxKernelCryptoAPI(unittest.TestCase):
+ # tests for AF_ALG
+ def create_alg(self, typ, name):
+ sock = socket.socket(socket.AF_ALG, socket.SOCK_SEQPACKET, 0)
+ sock.bind((typ, name))
+ return sock
+ def test_sha256(self):
+ expected = bytes.fromhex("ba7816bf8f01cfea414140de5dae2223b00361a396"
+ "177a9cb410ff61f20015ad")
+ with self.create_alg('hash', 'sha256') as algo:
+ op, _ = algo.accept()
+ with op:
+ op.sendall(b"abc")
+ self.assertEqual(op.recv(512), expected)
+ op, _ = algo.accept()
+ with op:
+ op.send(b'a', socket.MSG_MORE)
+ op.send(b'b', socket.MSG_MORE)
+ op.send(b'c', socket.MSG_MORE)
+ op.send(b'')
+ self.assertEqual(op.recv(512), expected)
+ def test_hmac_sha1(self):
+ expected = bytes.fromhex("effcdf6ae5eb2fa2d27416d5f184df9c259a7c79")
+ with self.create_alg('hash', 'hmac(sha1)') as algo:
+ algo.setsockopt(socket.SOL_ALG, socket.ALG_SET_KEY, b"Jefe")
+ op, _ = algo.accept()
+ with op:
+ op.sendall(b"what do ya want for nothing?")
+ self.assertEqual(op.recv(512), expected)
+ def test_aes_cbc(self):
+ key = bytes.fromhex('06a9214036b8a15b512e03d534120006')
+ iv = bytes.fromhex('3dafba429d9eb430b422da802c9fac41')
+ msg = b"Single block msg"
+ ciphertext = bytes.fromhex('e353779c1079aeb82708942dbe77181a')
+ msglen = len(msg)
+ with self.create_alg('skcipher', 'cbc(aes)') as algo:
+ algo.setsockopt(socket.SOL_ALG, socket.ALG_SET_KEY, key)
+ op, _ = algo.accept()
+ with op:
+ op.sendmsg_afalg(op=socket.ALG_OP_ENCRYPT, iv=iv,
+ flags=socket.MSG_MORE)
+ op.sendall(msg)
+ self.assertEqual(op.recv(msglen), ciphertext)
+ op, _ = algo.accept()
+ with op:
+ op.sendmsg_afalg([ciphertext],
+ op=socket.ALG_OP_DECRYPT, iv=iv)
+ self.assertEqual(op.recv(msglen), msg)
+ # long message
+ multiplier = 1024
+ longmsg = [msg] * multiplier
+ op, _ = algo.accept()
+ with op:
+ op.sendmsg_afalg(longmsg,
+ op=socket.ALG_OP_ENCRYPT, iv=iv)
+ enc = op.recv(msglen * multiplier)
+ self.assertEqual(len(enc), msglen * multiplier)
+ self.assertTrue(enc[:msglen], ciphertext)
+ op, _ = algo.accept()
+ with op:
+ op.sendmsg_afalg([enc],
+ op=socket.ALG_OP_DECRYPT, iv=iv)
+ dec = op.recv(msglen * multiplier)
+ self.assertEqual(len(dec), msglen * multiplier)
+ self.assertEqual(dec, msg * multiplier)
+ @support.requires_linux_version(3, 19)
+ def test_aead_aes_gcm(self):
+ key = bytes.fromhex('c939cc13397c1d37de6ae0e1cb7c423c')
+ iv = bytes.fromhex('b3d8cc017cbb89b39e0f67e2')
+ plain = bytes.fromhex('c3b3c41f113a31b73d9a5cd432103069')
+ assoc = bytes.fromhex('24825602bd12a984e0092d3e448eda5f')
+ expected_ct = bytes.fromhex('93fe7d9e9bfd10348a5606e5cafa7354')
+ expected_tag = bytes.fromhex('0032a1dc85f1c9786925a2e71d8272dd')
+ taglen = len(expected_tag)
+ assoclen = len(assoc)
+ with self.create_alg('aead', 'gcm(aes)') as algo:
+ algo.setsockopt(socket.SOL_ALG, socket.ALG_SET_KEY, key)
+ algo.setsockopt(socket.SOL_ALG, socket.ALG_SET_AEAD_AUTHSIZE,
+ None, taglen)
+ # send assoc, plain and tag buffer in separate steps
+ op, _ = algo.accept()
+ with op:
+ op.sendmsg_afalg(op=socket.ALG_OP_ENCRYPT, iv=iv,
+ assoclen=assoclen, flags=socket.MSG_MORE)
+ op.sendall(assoc, socket.MSG_MORE)
+ op.sendall(plain, socket.MSG_MORE)
+ op.sendall(b'\x00' * taglen)
+ res = op.recv(assoclen + len(plain) + taglen)
+ self.assertEqual(expected_ct, res[assoclen:-taglen])
+ self.assertEqual(expected_tag, res[-taglen:])
+ # now with msg
+ op, _ = algo.accept()
+ with op:
+ msg = assoc + plain + b'\x00' * taglen
+ op.sendmsg_afalg([msg], op=socket.ALG_OP_ENCRYPT, iv=iv,
+ assoclen=assoclen)
+ res = op.recv(assoclen + len(plain) + taglen)
+ self.assertEqual(expected_ct, res[assoclen:-taglen])
+ self.assertEqual(expected_tag, res[-taglen:])
+ # create anc data manually
+ pack_uint32 = struct.Struct('I').pack
+ op, _ = algo.accept()
+ with op:
+ msg = assoc + plain + b'\x00' * taglen
+ op.sendmsg(
+ [msg],
+ ([socket.SOL_ALG, socket.ALG_SET_OP, pack_uint32(socket.ALG_OP_ENCRYPT)],
+ [socket.SOL_ALG, socket.ALG_SET_IV, pack_uint32(len(iv)) + iv],
+ [socket.SOL_ALG, socket.ALG_SET_AEAD_ASSOCLEN, pack_uint32(assoclen)],
+ )
+ )
+ res = op.recv(len(msg))
+ self.assertEqual(expected_ct, res[assoclen:-taglen])
+ self.assertEqual(expected_tag, res[-taglen:])
+ # decrypt and verify
+ op, _ = algo.accept()
+ with op:
+ msg = assoc + expected_ct + expected_tag
+ op.sendmsg_afalg([msg], op=socket.ALG_OP_DECRYPT, iv=iv,
+ assoclen=assoclen)
+ res = op.recv(len(msg))
+ self.assertEqual(plain, res[assoclen:-taglen])
+ def test_drbg_pr_sha256(self):
+ # deterministic random bit generator, prediction resistance, sha256
+ with self.create_alg('rng', 'drbg_pr_sha256') as algo:
+ extra_seed = os.urandom(32)
+ algo.setsockopt(socket.SOL_ALG, socket.ALG_SET_KEY, extra_seed)
+ op, _ = algo.accept()
+ with op:
+ rn = op.recv(32)
+ self.assertEqual(len(rn), 32)
+ def test_sendmsg_afalg_args(self):
+ sock = socket.socket(socket.AF_ALG, socket.SOCK_SEQPACKET, 0)
+ with self.assertRaises(TypeError):
+ sock.sendmsg_afalg()
+ with self.assertRaises(TypeError):
+ sock.sendmsg_afalg(op=None)
+ with self.assertRaises(TypeError):
+ sock.sendmsg_afalg(1)
+ with self.assertRaises(TypeError):
+ sock.sendmsg_afalg(op=socket.ALG_OP_ENCRYPT, assoclen=None)
+ with self.assertRaises(TypeError):
+ sock.sendmsg_afalg(op=socket.ALG_OP_ENCRYPT, assoclen=-1)
def test_main():
tests = [GeneralModuleTests, BasicTCPTest, TCPCloserTest, TCPTimeoutTest,
@@ -5352,6 +5516,7 @@ def test_main():
tests.extend([TIPCTest, TIPCThreadableTest])
tests.extend([BasicCANTest, CANTest])
tests.extend([BasicRDSTest, RDSTest])
+ tests.append(LinuxKernelCryptoAPI)