From 3cd7c6e6eb43dbd7d7180503265772a67953e682 Mon Sep 17 00:00:00 2001 From: Olivier Grisel Date: Sat, 6 Jan 2018 16:18:54 +0100 Subject: bpo-31993: Do not allocate large temporary buffers in pickle dump. (#4353) The picklers do no longer allocate temporary memory when dumping large bytes and str objects into a file object. Instead the data is directly streamed into the underlying file object. Previously the C implementation would buffer all content and issue a single call to file.write() at the end of the dump. With protocol 4 this behavior has changed to issue one call to file.write() per frame. The Python pickler with protocol 4 now dumps each frame content as a memoryview to an IOBytes instance that is never reused and the memoryview is no longer released after the call to write. This makes it possible for the file object to delay access to the memoryview of previous frames without forcing any additional memory copy as was already possible with the C pickler. --- Lib/pickle.py | 50 ++++++-- Lib/pickletools.py | 11 +- Lib/test/pickletester.py | 131 +++++++++++++++++-- Lib/test/test_pickletools.py | 3 + .../2017-11-10-00-05-08.bpo-31993.-OMNg8.rst | 14 +++ Modules/_pickle.c | 138 +++++++++++++++++---- 6 files changed, 297 insertions(+), 50 deletions(-) create mode 100644 Misc/NEWS.d/next/Library/2017-11-10-00-05-08.bpo-31993.-OMNg8.rst diff --git a/Lib/pickle.py b/Lib/pickle.py index 350d4a4..301e8cf 100644 --- a/Lib/pickle.py +++ b/Lib/pickle.py @@ -201,14 +201,24 @@ class _Framer: if self.current_frame: f = self.current_frame if f.tell() >= self._FRAME_SIZE_TARGET or force: - with f.getbuffer() as data: - n = len(data) - write = self.file_write - write(FRAME) - write(pack("= 1 @@ -699,7 +725,9 @@ class _Pickler: if n <= 0xff: self.write(SHORT_BINBYTES + pack(" 0xffffffff and self.proto >= 4: - self.write(BINBYTES8 + pack("= self.framer._FRAME_SIZE_TARGET: + self._write_large_bytes(BINBYTES + pack("= 4: self.write(SHORT_BINUNICODE + pack(" 0xffffffff and self.proto >= 4: - self.write(BINUNICODE8 + pack("= self.framer._FRAME_SIZE_TARGET: + self._write_large_bytes(BINUNICODE + pack(" proto: proto = arg if pos == 0: - protoheader = p[pos: end_pos] + protoheader = p[pos:end_pos] else: opcodes.append((pos, end_pos)) else: @@ -2295,6 +2295,7 @@ def optimize(p): pickler.framer.start_framing() idx = 0 for op, arg in opcodes: + frameless = False if op is put: if arg not in newids: continue @@ -2305,8 +2306,12 @@ def optimize(p): data = pickler.get(newids[arg]) else: data = p[op:arg] - pickler.framer.commit_frame() - pickler.write(data) + frameless = len(data) > pickler.framer._FRAME_SIZE_TARGET + pickler.framer.commit_frame(force=frameless) + if frameless: + pickler.framer.file_write(data) + else: + pickler.write(data) pickler.framer.end_framing() return out.getvalue() diff --git a/Lib/test/pickletester.py b/Lib/test/pickletester.py index bf6116b..5d983eb 100644 --- a/Lib/test/pickletester.py +++ b/Lib/test/pickletester.py @@ -2042,21 +2042,40 @@ class AbstractPickleTests(unittest.TestCase): def check_frame_opcodes(self, pickled): """ Check the arguments of FRAME opcodes in a protocol 4+ pickle. + + Note that binary objects that are larger than FRAME_SIZE_TARGET are not + framed by default and are therefore considered a frame by themselves in + the following consistency check. """ - frame_opcode_size = 9 - last_arg = last_pos = None + last_arg = last_pos = last_frame_opcode_size = None + frameless_opcode_sizes = { + 'BINBYTES': 5, + 'BINUNICODE': 5, + 'BINBYTES8': 9, + 'BINUNICODE8': 9, + } for op, arg, pos in pickletools.genops(pickled): - if op.name != 'FRAME': + if op.name in frameless_opcode_sizes: + if len(arg) > self.FRAME_SIZE_TARGET: + frame_opcode_size = frameless_opcode_sizes[op.name] + arg = len(arg) + else: + continue + elif op.name == 'FRAME': + frame_opcode_size = 9 + else: continue + if last_pos is not None: # The previous frame's size should be equal to the number # of bytes up to the current frame. - frame_size = pos - last_pos - frame_opcode_size + frame_size = pos - last_pos - last_frame_opcode_size self.assertEqual(frame_size, last_arg) last_arg, last_pos = arg, pos + last_frame_opcode_size = frame_opcode_size # The last frame's size should be equal to the number of bytes up # to the pickle's end. - frame_size = len(pickled) - last_pos - frame_opcode_size + frame_size = len(pickled) - last_pos - last_frame_opcode_size self.assertEqual(frame_size, last_arg) def test_framing_many_objects(self): @@ -2076,15 +2095,36 @@ class AbstractPickleTests(unittest.TestCase): def test_framing_large_objects(self): N = 1024 * 1024 - obj = [b'x' * N, b'y' * N, b'z' * N] + obj = [b'x' * N, b'y' * N, 'z' * N] for proto in range(4, pickle.HIGHEST_PROTOCOL + 1): - with self.subTest(proto=proto): - pickled = self.dumps(obj, proto) - unpickled = self.loads(pickled) - self.assertEqual(obj, unpickled) - n_frames = count_opcode(pickle.FRAME, pickled) - self.assertGreaterEqual(n_frames, len(obj)) - self.check_frame_opcodes(pickled) + for fast in [True, False]: + with self.subTest(proto=proto, fast=fast): + if hasattr(self, 'pickler'): + buf = io.BytesIO() + pickler = self.pickler(buf, protocol=proto) + pickler.fast = fast + pickler.dump(obj) + pickled = buf.getvalue() + elif fast: + continue + else: + # Fallback to self.dumps when fast=False and + # self.pickler is not available. + pickled = self.dumps(obj, proto) + unpickled = self.loads(pickled) + # More informative error message in case of failure. + self.assertEqual([len(x) for x in obj], + [len(x) for x in unpickled]) + # Perform full equality check if the lengths match. + self.assertEqual(obj, unpickled) + n_frames = count_opcode(pickle.FRAME, pickled) + if not fast: + # One frame per memoize for each large object. + self.assertGreaterEqual(n_frames, len(obj)) + else: + # One frame at the beginning and one at the end. + self.assertGreaterEqual(n_frames, 2) + self.check_frame_opcodes(pickled) def test_optional_frames(self): if pickle.HIGHEST_PROTOCOL < 4: @@ -2125,6 +2165,71 @@ class AbstractPickleTests(unittest.TestCase): count_opcode(pickle.FRAME, pickled)) self.assertEqual(obj, self.loads(some_frames_pickle)) + def test_framed_write_sizes_with_delayed_writer(self): + class ChunkAccumulator: + """Accumulate pickler output in a list of raw chunks.""" + + def __init__(self): + self.chunks = [] + + def write(self, chunk): + self.chunks.append(chunk) + + def concatenate_chunks(self): + # Some chunks can be memoryview instances, we need to convert + # them to bytes to be able to call join + return b"".join([c.tobytes() if hasattr(c, 'tobytes') else c + for c in self.chunks]) + + small_objects = [(str(i).encode('ascii'), i % 42, {'i': str(i)}) + for i in range(int(1e4))] + + for proto in range(4, pickle.HIGHEST_PROTOCOL + 1): + # Protocol 4 packs groups of small objects into frames and issues + # calls to write only once or twice per frame: + # The C pickler issues one call to write per-frame (header and + # contents) while Python pickler issues two calls to write: one for + # the frame header and one for the frame binary contents. + writer = ChunkAccumulator() + self.pickler(writer, proto).dump(small_objects) + + # Actually read the binary content of the chunks after the end + # of the call to dump: ant memoryview passed to write should not + # be released otherwise this delayed access would not be possible. + pickled = writer.concatenate_chunks() + reconstructed = self.loads(pickled) + self.assertEqual(reconstructed, small_objects) + self.assertGreater(len(writer.chunks), 1) + + n_frames, remainder = divmod(len(pickled), self.FRAME_SIZE_TARGET) + if remainder > 0: + n_frames += 1 + + # There should be at least one call to write per frame + self.assertGreaterEqual(len(writer.chunks), n_frames) + + # but not too many either: there can be one for the proto, + # one per-frame header and one per frame for the actual contents. + self.assertGreaterEqual(2 * n_frames + 1, len(writer.chunks)) + + chunk_sizes = [len(c) for c in writer.chunks[:-1]] + large_sizes = [s for s in chunk_sizes + if s >= self.FRAME_SIZE_TARGET] + small_sizes = [s for s in chunk_sizes + if s < self.FRAME_SIZE_TARGET] + + # Large chunks should not be too large: + for chunk_size in large_sizes: + self.assertGreater(2 * self.FRAME_SIZE_TARGET, chunk_size) + + last_chunk_size = len(writer.chunks[-1]) + self.assertGreater(2 * self.FRAME_SIZE_TARGET, last_chunk_size) + + # Small chunks (if any) should be very small + # (only proto and frame headers) + for chunk_size in small_sizes: + self.assertGreaterEqual(9, chunk_size) + def test_nested_names(self): global Nested class Nested: diff --git a/Lib/test/test_pickletools.py b/Lib/test/test_pickletools.py index b3cab0e..e40a958 100644 --- a/Lib/test/test_pickletools.py +++ b/Lib/test/test_pickletools.py @@ -15,6 +15,9 @@ class OptimizedPickleTests(AbstractPickleTests): # Test relies on precise output of dumps() test_pickle_to_2x = None + # Test relies on writing by chunks into a file object. + test_framed_write_sizes_with_delayed_writer = None + def test_optimize_long_binget(self): data = [str(i) for i in range(257)] data.append(data[-1]) diff --git a/Misc/NEWS.d/next/Library/2017-11-10-00-05-08.bpo-31993.-OMNg8.rst b/Misc/NEWS.d/next/Library/2017-11-10-00-05-08.bpo-31993.-OMNg8.rst new file mode 100644 index 0000000..b453e21 --- /dev/null +++ b/Misc/NEWS.d/next/Library/2017-11-10-00-05-08.bpo-31993.-OMNg8.rst @@ -0,0 +1,14 @@ +The picklers do no longer allocate temporary memory when dumping large +``bytes`` and ``str`` objects into a file object. Instead the data is +directly streamed into the underlying file object. + +Previously the C implementation would buffer all content and issue a +single call to ``file.write`` at the end of the dump. With protocol 4 +this behavior has changed to issue one call to ``file.write`` per frame. + +The Python pickler with protocol 4 now dumps each frame content as a +memoryview to an IOBytes instance that is never reused and the +memoryview is no longer released after the call to write. This makes it +possible for the file object to delay access to the memoryview of +previous frames without forcing any additional memory copy as was +already possible with the C pickler. diff --git a/Modules/_pickle.c b/Modules/_pickle.c index da915ef..5cb1fba 100644 --- a/Modules/_pickle.c +++ b/Modules/_pickle.c @@ -971,20 +971,6 @@ _Pickler_CommitFrame(PicklerObject *self) return 0; } -static int -_Pickler_OpcodeBoundary(PicklerObject *self) -{ - Py_ssize_t frame_len; - - if (!self->framing || self->frame_start == -1) - return 0; - frame_len = self->output_len - self->frame_start - FRAME_HEADER_SIZE; - if (frame_len >= FRAME_SIZE_TARGET) - return _Pickler_CommitFrame(self); - else - return 0; -} - static PyObject * _Pickler_GetString(PicklerObject *self) { @@ -1019,6 +1005,38 @@ _Pickler_FlushToFile(PicklerObject *self) return (result == NULL) ? -1 : 0; } +static int +_Pickler_OpcodeBoundary(PicklerObject *self) +{ + Py_ssize_t frame_len; + + if (!self->framing || self->frame_start == -1) { + return 0; + } + frame_len = self->output_len - self->frame_start - FRAME_HEADER_SIZE; + if (frame_len >= FRAME_SIZE_TARGET) { + if(_Pickler_CommitFrame(self)) { + return -1; + } + /* Flush the content of the commited frame to the underlying + * file and reuse the pickler buffer for the next frame so as + * to limit memory usage when dumping large complex objects to + * a file. + * + * self->write is NULL when called via dumps. + */ + if (self->write != NULL) { + if (_Pickler_FlushToFile(self) < 0) { + return -1; + } + if (_Pickler_ClearBuffer(self) < 0) { + return -1; + } + } + } + return 0; +} + static Py_ssize_t _Pickler_Write(PicklerObject *self, const char *s, Py_ssize_t data_len) { @@ -2124,6 +2142,51 @@ done: return 0; } +/* No-copy code-path to write large contiguous data directly into the + underlying file object, bypassing the output_buffer of the Pickler. */ +static int +_Pickler_write_large_bytes( + PicklerObject *self, const char *header, Py_ssize_t header_size, + PyObject *payload) +{ + assert(self->output_buffer != NULL); + assert(self->write != NULL); + PyObject *result; + + /* Commit the previous frame. */ + if (_Pickler_CommitFrame(self)) { + return -1; + } + /* Disable frameing temporarily */ + self->framing = 0; + + if (_Pickler_Write(self, header, header_size) < 0) { + return -1; + } + /* Dump the output buffer to the file. */ + if (_Pickler_FlushToFile(self) < 0) { + return -1; + } + + /* Stream write the payload into the file without going through the + output buffer. */ + result = PyObject_CallFunctionObjArgs(self->write, payload, NULL); + if (result == NULL) { + return -1; + } + Py_DECREF(result); + + /* Reinitialize the buffer for subsequent calls to _Pickler_Write. */ + if (_Pickler_ClearBuffer(self) < 0) { + return -1; + } + + /* Re-enable framing for subsequent calls to _Pickler_Write. */ + self->framing = 1; + + return 0; +} + static int save_bytes(PicklerObject *self, PyObject *obj) { @@ -2202,11 +2265,21 @@ save_bytes(PicklerObject *self, PyObject *obj) return -1; /* string too large */ } - if (_Pickler_Write(self, header, len) < 0) - return -1; - - if (_Pickler_Write(self, PyBytes_AS_STRING(obj), size) < 0) - return -1; + if (size < FRAME_SIZE_TARGET || self->write == NULL) { + if (_Pickler_Write(self, header, len) < 0) { + return -1; + } + if (_Pickler_Write(self, PyBytes_AS_STRING(obj), size) < 0) { + return -1; + } + } + else { + /* Bypass the in-memory buffer to directly stream large data + into the underlying file object. */ + if (_Pickler_write_large_bytes(self, header, len, obj) < 0) { + return -1; + } + } if (memo_put(self, obj) < 0) return -1; @@ -2291,6 +2364,7 @@ write_utf8(PicklerObject *self, const char *data, Py_ssize_t size) { char header[9]; Py_ssize_t len; + PyObject *mem; assert(size >= 0); if (size <= 0xff && self->proto >= 4) { @@ -2317,11 +2391,27 @@ write_utf8(PicklerObject *self, const char *data, Py_ssize_t size) return -1; } - if (_Pickler_Write(self, header, len) < 0) - return -1; - if (_Pickler_Write(self, data, size) < 0) - return -1; - + if (size < FRAME_SIZE_TARGET || self->write == NULL) { + if (_Pickler_Write(self, header, len) < 0) { + return -1; + } + if (_Pickler_Write(self, data, size) < 0) { + return -1; + } + } + else { + /* Bypass the in-memory buffer to directly stream large data + into the underlying file object. */ + mem = PyMemoryView_FromMemory((char *) data, size, PyBUF_READ); + if (mem == NULL) { + return -1; + } + if (_Pickler_write_large_bytes(self, header, len, mem) < 0) { + Py_DECREF(mem); + return -1; + } + Py_DECREF(mem); + } return 0; } -- cgit v0.12