diff options
Diffstat (limited to 'Lib/test/test_buffer.py')
-rw-r--r-- | Lib/test/test_buffer.py | 142 |
1 files changed, 142 insertions, 0 deletions
diff --git a/Lib/test/test_buffer.py b/Lib/test/test_buffer.py index 098d2d9..b6e82ad 100644 --- a/Lib/test/test_buffer.py +++ b/Lib/test/test_buffer.py @@ -17,6 +17,7 @@ import contextlib import unittest from test import support from test.support import os_helper +import inspect from itertools import permutations, product from random import randrange, sample, choice import warnings @@ -4438,5 +4439,146 @@ class TestBufferProtocol(unittest.TestCase): struct.calcsize(format)) +class TestPythonBufferProtocol(unittest.TestCase): + def test_basic(self): + class MyBuffer: + def __buffer__(self, flags): + return memoryview(b"hello") + + mv = memoryview(MyBuffer()) + self.assertEqual(mv.tobytes(), b"hello") + self.assertEqual(bytes(MyBuffer()), b"hello") + + def test_bad_buffer_method(self): + class MustReturnMV: + def __buffer__(self, flags): + return 42 + + self.assertRaises(TypeError, memoryview, MustReturnMV()) + + class NoBytesEither: + def __buffer__(self, flags): + return b"hello" + + self.assertRaises(TypeError, memoryview, NoBytesEither()) + + class WrongArity: + def __buffer__(self): + return memoryview(b"hello") + + self.assertRaises(TypeError, memoryview, WrongArity()) + + def test_release_buffer(self): + class WhatToRelease: + def __init__(self): + self.held = False + self.ba = bytearray(b"hello") + + def __buffer__(self, flags): + if self.held: + raise TypeError("already held") + self.held = True + return memoryview(self.ba) + + def __release_buffer__(self, buffer): + self.held = False + + wr = WhatToRelease() + self.assertFalse(wr.held) + with memoryview(wr) as mv: + self.assertTrue(wr.held) + self.assertEqual(mv.tobytes(), b"hello") + self.assertFalse(wr.held) + + def test_same_buffer_returned(self): + class WhatToRelease: + def __init__(self): + self.held = False + self.ba = bytearray(b"hello") + self.created_mv = None + + def __buffer__(self, flags): + if self.held: + raise TypeError("already held") + self.held = True + self.created_mv = memoryview(self.ba) + return self.created_mv + + def __release_buffer__(self, buffer): + assert buffer is self.created_mv + self.held = False + + wr = WhatToRelease() + self.assertFalse(wr.held) + with memoryview(wr) as mv: + self.assertTrue(wr.held) + self.assertEqual(mv.tobytes(), b"hello") + self.assertFalse(wr.held) + + def test_buffer_flags(self): + class PossiblyMutable: + def __init__(self, data, mutable) -> None: + self._data = bytearray(data) + self._mutable = mutable + + def __buffer__(self, flags): + if flags & inspect.BufferFlags.WRITABLE: + if not self._mutable: + raise RuntimeError("not mutable") + return memoryview(self._data) + else: + return memoryview(bytes(self._data)) + + mutable = PossiblyMutable(b"hello", True) + immutable = PossiblyMutable(b"hello", False) + with memoryview._from_flags(mutable, inspect.BufferFlags.WRITABLE) as mv: + self.assertEqual(mv.tobytes(), b"hello") + mv[0] = ord(b'x') + self.assertEqual(mv.tobytes(), b"xello") + with memoryview._from_flags(mutable, inspect.BufferFlags.SIMPLE) as mv: + self.assertEqual(mv.tobytes(), b"xello") + with self.assertRaises(TypeError): + mv[0] = ord(b'h') + self.assertEqual(mv.tobytes(), b"xello") + with memoryview._from_flags(immutable, inspect.BufferFlags.SIMPLE) as mv: + self.assertEqual(mv.tobytes(), b"hello") + with self.assertRaises(TypeError): + mv[0] = ord(b'x') + self.assertEqual(mv.tobytes(), b"hello") + + with self.assertRaises(RuntimeError): + memoryview._from_flags(immutable, inspect.BufferFlags.WRITABLE) + with memoryview(immutable) as mv: + self.assertEqual(mv.tobytes(), b"hello") + with self.assertRaises(TypeError): + mv[0] = ord(b'x') + self.assertEqual(mv.tobytes(), b"hello") + + def test_call_builtins(self): + ba = bytearray(b"hello") + mv = ba.__buffer__(0) + self.assertEqual(mv.tobytes(), b"hello") + ba.__release_buffer__(mv) + with self.assertRaises(OverflowError): + ba.__buffer__(sys.maxsize + 1) + + @unittest.skipIf(_testcapi is None, "requires _testcapi") + def test_c_buffer(self): + buf = _testcapi.testBuf() + self.assertEqual(buf.references, 0) + mv = buf.__buffer__(0) + self.assertIsInstance(mv, memoryview) + self.assertEqual(mv.tobytes(), b"test") + self.assertEqual(buf.references, 1) + buf.__release_buffer__(mv) + self.assertEqual(buf.references, 0) + with self.assertRaises(ValueError): + mv.tobytes() + # Calling it again doesn't cause issues + with self.assertRaises(ValueError): + buf.__release_buffer__(mv) + self.assertEqual(buf.references, 0) + + if __name__ == "__main__": unittest.main() |