diff options
-rw-r--r-- | Lib/_pyio.py | 47 | ||||
-rw-r--r-- | Lib/test/test_io.py | 80 | ||||
-rw-r--r-- | Misc/ACKS | 1 | ||||
-rw-r--r-- | Misc/NEWS | 6 | ||||
-rw-r--r-- | Modules/_io/bufferedio.c | 78 |
5 files changed, 152 insertions, 60 deletions
diff --git a/Lib/_pyio.py b/Lib/_pyio.py index fec1c6f..fcd548d 100644 --- a/Lib/_pyio.py +++ b/Lib/_pyio.py @@ -6,6 +6,7 @@ import os import abc import codecs import warnings +import errno # Import _thread instead of threading to reduce startup cost try: from _thread import allocate_lock as Lock @@ -717,8 +718,11 @@ class _BufferedIOMixin(BufferedIOBase): def close(self): if self.raw is not None and not self.closed: - self.flush() - self.raw.close() + try: + # may raise BlockingIOError or BrokenPipeError etc + self.flush() + finally: + self.raw.close() def detach(self): if self.raw is None: @@ -1077,13 +1081,9 @@ class BufferedWriter(_BufferedIOMixin): # XXX we can implement some more tricks to try and avoid # partial writes if len(self._write_buf) > self.buffer_size: - # We're full, so let's pre-flush the buffer - try: - self._flush_unlocked() - except BlockingIOError as e: - # We can't accept anything else. - # XXX Why not just let the exception pass through? - raise BlockingIOError(e.errno, e.strerror, 0) + # We're full, so let's pre-flush the buffer. (This may + # raise BlockingIOError with characters_written == 0.) + self._flush_unlocked() before = len(self._write_buf) self._write_buf.extend(b) written = len(self._write_buf) - before @@ -1114,22 +1114,21 @@ class BufferedWriter(_BufferedIOMixin): def _flush_unlocked(self): if self.closed: raise ValueError("flush of closed file") - written = 0 - try: - while self._write_buf: - try: - n = self.raw.write(self._write_buf) - except InterruptedError: - continue - if n > len(self._write_buf) or n < 0: - raise IOError("write() returned incorrect number of bytes") - del self._write_buf[:n] - written += n - except BlockingIOError as e: - n = e.characters_written + while self._write_buf: + try: + n = self.raw.write(self._write_buf) + except InterruptedError: + continue + except BlockingIOError: + raise RuntimeError("self.raw should implement RawIOBase: it " + "should not raise BlockingIOError") + if n is None: + raise BlockingIOError( + errno.EAGAIN, + "write could not complete without blocking", 0) + if n > len(self._write_buf) or n < 0: + raise IOError("write() returned incorrect number of bytes") del self._write_buf[:n] - written += n - raise BlockingIOError(e.errno, e.strerror, written) def tell(self): return _BufferedIOMixin.tell(self) + len(self._write_buf) diff --git a/Lib/test/test_io.py b/Lib/test/test_io.py index 318f7a7..5954999 100644 --- a/Lib/test/test_io.py +++ b/Lib/test/test_io.py @@ -42,7 +42,10 @@ try: import threading except ImportError: threading = None - +try: + import fcntl +except ImportError: + fcntl = None def _default_chunk_size(): """Get the default TextIOWrapper chunk size""" @@ -242,9 +245,14 @@ class MockNonBlockWriterIO: except ValueError: pass else: - self._blocker_char = None - self._write_stack.append(b[:n]) - raise self.BlockingIOError(0, "test blocking", n) + if n > 0: + # write data up to the first blocker + self._write_stack.append(b[:n]) + return n + else: + # cancel blocker and indicate would block + self._blocker_char = None + return None self._write_stack.append(b) return len(b) @@ -2753,6 +2761,70 @@ class MiscIOTest(unittest.TestCase): with self.open(support.TESTFN, **kwargs) as f: self.assertRaises(TypeError, pickle.dumps, f, protocol) + @unittest.skipUnless(fcntl, 'fcntl required for this test') + def test_nonblock_pipe_write_bigbuf(self): + self._test_nonblock_pipe_write(16*1024) + + @unittest.skipUnless(fcntl, 'fcntl required for this test') + def test_nonblock_pipe_write_smallbuf(self): + self._test_nonblock_pipe_write(1024) + + def _set_non_blocking(self, fd): + flags = fcntl.fcntl(fd, fcntl.F_GETFL) + self.assertNotEqual(flags, -1) + res = fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK) + self.assertEqual(res, 0) + + def _test_nonblock_pipe_write(self, bufsize): + sent = [] + received = [] + r, w = os.pipe() + self._set_non_blocking(r) + self._set_non_blocking(w) + + # To exercise all code paths in the C implementation we need + # to play with buffer sizes. For instance, if we choose a + # buffer size less than or equal to _PIPE_BUF (4096 on Linux) + # then we will never get a partial write of the buffer. + rf = self.open(r, mode='rb', closefd=True, buffering=bufsize) + wf = self.open(w, mode='wb', closefd=True, buffering=bufsize) + + with rf, wf: + for N in 9999, 73, 7574: + try: + i = 0 + while True: + msg = bytes([i % 26 + 97]) * N + sent.append(msg) + wf.write(msg) + i += 1 + + except self.BlockingIOError as e: + self.assertEqual(e.args[0], errno.EAGAIN) + self.assertEqual(e.args[2], e.characters_written) + sent[-1] = sent[-1][:e.characters_written] + received.append(rf.read()) + msg = b'BLOCKED' + wf.write(msg) + sent.append(msg) + + while True: + try: + wf.flush() + break + except self.BlockingIOError as e: + self.assertEqual(e.args[0], errno.EAGAIN) + self.assertEqual(e.args[2], e.characters_written) + self.assertEqual(e.characters_written, 0) + received.append(rf.read()) + + received += iter(rf.read, None) + + sent, received = b''.join(sent), b''.join(received) + self.assertTrue(sent == received) + self.assertTrue(wf.closed) + self.assertTrue(rf.closed) + class CMiscIOTest(MiscIOTest): io = io @@ -860,6 +860,7 @@ Ilya Sandler Mark Sapiro Ty Sarna Ben Sayer +sbt Andrew Schaaf Michael Scharf Andreas Schawo @@ -384,6 +384,12 @@ Core and Builtins Library ------- +- Issue #13322: Fix BufferedWriter.write() to ensure that BlockingIOError is + raised when the wrapped raw file is non-blocking and the write would block. + Previous code assumed that the raw write() would raise BlockingIOError, but + RawIOBase.write() is defined to returned None when the call would block. + Patch by sbt. + - Issue #13358: HTMLParser now calls handle_data only once for each CDATA. - Issue #4147: minidom's toprettyxml no longer adds whitespace around a text diff --git a/Modules/_io/bufferedio.c b/Modules/_io/bufferedio.c index bb52a46..8a9ae47 100644 --- a/Modules/_io/bufferedio.c +++ b/Modules/_io/bufferedio.c @@ -596,7 +596,7 @@ buffered_getstate(buffered *self, PyObject *args) /* Forward decls */ static PyObject * -_bufferedwriter_flush_unlocked(buffered *, int); +_bufferedwriter_flush_unlocked(buffered *); static Py_ssize_t _bufferedreader_fill_buffer(buffered *self); static void @@ -618,6 +618,18 @@ _bufferedreader_raw_read(buffered *self, char *start, Py_ssize_t len); * Helpers */ +/* Sets the current error to BlockingIOError */ +static void +_set_BlockingIOError(char *msg, Py_ssize_t written) +{ + PyObject *err; + err = PyObject_CallFunction(PyExc_BlockingIOError, "isn", + errno, msg, written); + if (err) + PyErr_SetObject(PyExc_BlockingIOError, err); + Py_XDECREF(err); +} + /* Returns the address of the `written` member if a BlockingIOError was raised, NULL otherwise. The error is always re-raised. */ static Py_ssize_t * @@ -772,7 +784,7 @@ buffered_flush_and_rewind_unlocked(buffered *self) { PyObject *res; - res = _bufferedwriter_flush_unlocked(self, 0); + res = _bufferedwriter_flush_unlocked(self); if (res == NULL) return NULL; Py_DECREF(res); @@ -1188,7 +1200,7 @@ buffered_seek(buffered *self, PyObject *args) /* Fallback: invoke raw seek() method and clear buffer */ if (self->writable) { - res = _bufferedwriter_flush_unlocked(self, 0); + res = _bufferedwriter_flush_unlocked(self); if (res == NULL) goto end; Py_CLEAR(res); @@ -1807,6 +1819,7 @@ _bufferedwriter_raw_write(buffered *self, char *start, Py_ssize_t len) Py_buffer buf; PyObject *memobj, *res; Py_ssize_t n; + int errnum; /* NOTE: the buffer needn't be released as its object is NULL. */ if (PyBuffer_FillInfo(&buf, NULL, start, len, 1, PyBUF_CONTIG_RO) == -1) return -1; @@ -1819,11 +1832,21 @@ _bufferedwriter_raw_write(buffered *self, char *start, Py_ssize_t len) raised (see issue #10956). */ do { + errno = 0; res = PyObject_CallMethodObjArgs(self->raw, _PyIO_str_write, memobj, NULL); + errnum = errno; } while (res == NULL && _trap_eintr()); Py_DECREF(memobj); if (res == NULL) return -1; + if (res == Py_None) { + /* Non-blocking stream would have blocked. Special return code! + Being paranoid we reset errno in case it is changed by code + triggered by a decref. errno is used by _set_BlockingIOError(). */ + Py_DECREF(res); + errno = errnum; + return -2; + } n = PyNumber_AsSsize_t(res, PyExc_ValueError); Py_DECREF(res); if (n < 0 || n > len) { @@ -1840,7 +1863,7 @@ _bufferedwriter_raw_write(buffered *self, char *start, Py_ssize_t len) /* `restore_pos` is 1 if we need to restore the raw stream position at the end, 0 otherwise. */ static PyObject * -_bufferedwriter_flush_unlocked(buffered *self, int restore_pos) +_bufferedwriter_flush_unlocked(buffered *self) { Py_ssize_t written = 0; Py_off_t n, rewind; @@ -1862,14 +1885,11 @@ _bufferedwriter_flush_unlocked(buffered *self, int restore_pos) Py_SAFE_DOWNCAST(self->write_end - self->write_pos, Py_off_t, Py_ssize_t)); if (n == -1) { - Py_ssize_t *w = _buffered_check_blocking_error(); - if (w == NULL) - goto error; - self->write_pos += *w; - self->raw_pos = self->write_pos; - written += *w; - *w = written; - /* Already re-raised */ + goto error; + } + else if (n == -2) { + _set_BlockingIOError("write could not complete without blocking", + 0); goto error; } self->write_pos += n; @@ -1882,16 +1902,6 @@ _bufferedwriter_flush_unlocked(buffered *self, int restore_pos) goto error; } - if (restore_pos) { - Py_off_t forward = rewind - written; - if (forward != 0) { - n = _buffered_raw_seek(self, forward, 1); - if (n < 0) { - goto error; - } - self->raw_pos += forward; - } - } _bufferedwriter_reset_buf(self); end: @@ -1944,7 +1954,7 @@ bufferedwriter_write(buffered *self, PyObject *args) } /* First write the current buffer */ - res = _bufferedwriter_flush_unlocked(self, 0); + res = _bufferedwriter_flush_unlocked(self); if (res == NULL) { Py_ssize_t *w = _buffered_check_blocking_error(); if (w == NULL) @@ -1967,14 +1977,19 @@ bufferedwriter_write(buffered *self, PyObject *args) PyErr_Clear(); memcpy(self->buffer + self->write_end, buf.buf, buf.len); self->write_end += buf.len; + self->pos += buf.len; written = buf.len; goto end; } /* Buffer as much as possible. */ memcpy(self->buffer + self->write_end, buf.buf, avail); self->write_end += avail; - /* Already re-raised */ - *w = avail; + self->pos += avail; + /* XXX Modifying the existing exception e using the pointer w + will change e.characters_written but not e.args[2]. + Therefore we just replace with a new error. */ + _set_BlockingIOError("write could not complete without blocking", + avail); goto error; } Py_CLEAR(res); @@ -1999,11 +2014,9 @@ bufferedwriter_write(buffered *self, PyObject *args) Py_ssize_t n = _bufferedwriter_raw_write( self, (char *) buf.buf + written, buf.len - written); if (n == -1) { - Py_ssize_t *w = _buffered_check_blocking_error(); - if (w == NULL) - goto error; - written += *w; - remaining -= *w; + goto error; + } else if (n == -2) { + /* Write failed because raw file is non-blocking */ if (remaining > self->buffer_size) { /* Can't buffer everything, still buffer as much as possible */ memcpy(self->buffer, @@ -2011,8 +2024,9 @@ bufferedwriter_write(buffered *self, PyObject *args) self->raw_pos = 0; ADJUST_POSITION(self, self->buffer_size); self->write_end = self->buffer_size; - *w = written + self->buffer_size; - /* Already re-raised */ + written += self->buffer_size; + _set_BlockingIOError("write could not complete without " + "blocking", written); goto error; } PyErr_Clear(); |