summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--Lib/_pyio.py47
-rw-r--r--Lib/test/test_io.py80
-rw-r--r--Misc/ACKS1
-rw-r--r--Misc/NEWS6
-rw-r--r--Modules/_io/bufferedio.c78
5 files changed, 152 insertions, 60 deletions
diff --git a/Lib/_pyio.py b/Lib/_pyio.py
index fec1c6f..fcd548d 100644
--- a/Lib/_pyio.py
+++ b/Lib/_pyio.py
@@ -6,6 +6,7 @@ import os
import abc
import codecs
import warnings
+import errno
# Import _thread instead of threading to reduce startup cost
try:
from _thread import allocate_lock as Lock
@@ -717,8 +718,11 @@ class _BufferedIOMixin(BufferedIOBase):
def close(self):
if self.raw is not None and not self.closed:
- self.flush()
- self.raw.close()
+ try:
+ # may raise BlockingIOError or BrokenPipeError etc
+ self.flush()
+ finally:
+ self.raw.close()
def detach(self):
if self.raw is None:
@@ -1077,13 +1081,9 @@ class BufferedWriter(_BufferedIOMixin):
# XXX we can implement some more tricks to try and avoid
# partial writes
if len(self._write_buf) > self.buffer_size:
- # We're full, so let's pre-flush the buffer
- try:
- self._flush_unlocked()
- except BlockingIOError as e:
- # We can't accept anything else.
- # XXX Why not just let the exception pass through?
- raise BlockingIOError(e.errno, e.strerror, 0)
+ # We're full, so let's pre-flush the buffer. (This may
+ # raise BlockingIOError with characters_written == 0.)
+ self._flush_unlocked()
before = len(self._write_buf)
self._write_buf.extend(b)
written = len(self._write_buf) - before
@@ -1114,22 +1114,21 @@ class BufferedWriter(_BufferedIOMixin):
def _flush_unlocked(self):
if self.closed:
raise ValueError("flush of closed file")
- written = 0
- try:
- while self._write_buf:
- try:
- n = self.raw.write(self._write_buf)
- except InterruptedError:
- continue
- if n > len(self._write_buf) or n < 0:
- raise IOError("write() returned incorrect number of bytes")
- del self._write_buf[:n]
- written += n
- except BlockingIOError as e:
- n = e.characters_written
+ while self._write_buf:
+ try:
+ n = self.raw.write(self._write_buf)
+ except InterruptedError:
+ continue
+ except BlockingIOError:
+ raise RuntimeError("self.raw should implement RawIOBase: it "
+ "should not raise BlockingIOError")
+ if n is None:
+ raise BlockingIOError(
+ errno.EAGAIN,
+ "write could not complete without blocking", 0)
+ if n > len(self._write_buf) or n < 0:
+ raise IOError("write() returned incorrect number of bytes")
del self._write_buf[:n]
- written += n
- raise BlockingIOError(e.errno, e.strerror, written)
def tell(self):
return _BufferedIOMixin.tell(self) + len(self._write_buf)
diff --git a/Lib/test/test_io.py b/Lib/test/test_io.py
index 318f7a7..5954999 100644
--- a/Lib/test/test_io.py
+++ b/Lib/test/test_io.py
@@ -42,7 +42,10 @@ try:
import threading
except ImportError:
threading = None
-
+try:
+ import fcntl
+except ImportError:
+ fcntl = None
def _default_chunk_size():
"""Get the default TextIOWrapper chunk size"""
@@ -242,9 +245,14 @@ class MockNonBlockWriterIO:
except ValueError:
pass
else:
- self._blocker_char = None
- self._write_stack.append(b[:n])
- raise self.BlockingIOError(0, "test blocking", n)
+ if n > 0:
+ # write data up to the first blocker
+ self._write_stack.append(b[:n])
+ return n
+ else:
+ # cancel blocker and indicate would block
+ self._blocker_char = None
+ return None
self._write_stack.append(b)
return len(b)
@@ -2753,6 +2761,70 @@ class MiscIOTest(unittest.TestCase):
with self.open(support.TESTFN, **kwargs) as f:
self.assertRaises(TypeError, pickle.dumps, f, protocol)
+ @unittest.skipUnless(fcntl, 'fcntl required for this test')
+ def test_nonblock_pipe_write_bigbuf(self):
+ self._test_nonblock_pipe_write(16*1024)
+
+ @unittest.skipUnless(fcntl, 'fcntl required for this test')
+ def test_nonblock_pipe_write_smallbuf(self):
+ self._test_nonblock_pipe_write(1024)
+
+ def _set_non_blocking(self, fd):
+ flags = fcntl.fcntl(fd, fcntl.F_GETFL)
+ self.assertNotEqual(flags, -1)
+ res = fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
+ self.assertEqual(res, 0)
+
+ def _test_nonblock_pipe_write(self, bufsize):
+ sent = []
+ received = []
+ r, w = os.pipe()
+ self._set_non_blocking(r)
+ self._set_non_blocking(w)
+
+ # To exercise all code paths in the C implementation we need
+ # to play with buffer sizes. For instance, if we choose a
+ # buffer size less than or equal to _PIPE_BUF (4096 on Linux)
+ # then we will never get a partial write of the buffer.
+ rf = self.open(r, mode='rb', closefd=True, buffering=bufsize)
+ wf = self.open(w, mode='wb', closefd=True, buffering=bufsize)
+
+ with rf, wf:
+ for N in 9999, 73, 7574:
+ try:
+ i = 0
+ while True:
+ msg = bytes([i % 26 + 97]) * N
+ sent.append(msg)
+ wf.write(msg)
+ i += 1
+
+ except self.BlockingIOError as e:
+ self.assertEqual(e.args[0], errno.EAGAIN)
+ self.assertEqual(e.args[2], e.characters_written)
+ sent[-1] = sent[-1][:e.characters_written]
+ received.append(rf.read())
+ msg = b'BLOCKED'
+ wf.write(msg)
+ sent.append(msg)
+
+ while True:
+ try:
+ wf.flush()
+ break
+ except self.BlockingIOError as e:
+ self.assertEqual(e.args[0], errno.EAGAIN)
+ self.assertEqual(e.args[2], e.characters_written)
+ self.assertEqual(e.characters_written, 0)
+ received.append(rf.read())
+
+ received += iter(rf.read, None)
+
+ sent, received = b''.join(sent), b''.join(received)
+ self.assertTrue(sent == received)
+ self.assertTrue(wf.closed)
+ self.assertTrue(rf.closed)
+
class CMiscIOTest(MiscIOTest):
io = io
diff --git a/Misc/ACKS b/Misc/ACKS
index acf2147..952644b 100644
--- a/Misc/ACKS
+++ b/Misc/ACKS
@@ -860,6 +860,7 @@ Ilya Sandler
Mark Sapiro
Ty Sarna
Ben Sayer
+sbt
Andrew Schaaf
Michael Scharf
Andreas Schawo
diff --git a/Misc/NEWS b/Misc/NEWS
index 90c0f3b..d3b5d64 100644
--- a/Misc/NEWS
+++ b/Misc/NEWS
@@ -384,6 +384,12 @@ Core and Builtins
Library
-------
+- Issue #13322: Fix BufferedWriter.write() to ensure that BlockingIOError is
+ raised when the wrapped raw file is non-blocking and the write would block.
+ Previous code assumed that the raw write() would raise BlockingIOError, but
+ RawIOBase.write() is defined to returned None when the call would block.
+ Patch by sbt.
+
- Issue #13358: HTMLParser now calls handle_data only once for each CDATA.
- Issue #4147: minidom's toprettyxml no longer adds whitespace around a text
diff --git a/Modules/_io/bufferedio.c b/Modules/_io/bufferedio.c
index bb52a46..8a9ae47 100644
--- a/Modules/_io/bufferedio.c
+++ b/Modules/_io/bufferedio.c
@@ -596,7 +596,7 @@ buffered_getstate(buffered *self, PyObject *args)
/* Forward decls */
static PyObject *
-_bufferedwriter_flush_unlocked(buffered *, int);
+_bufferedwriter_flush_unlocked(buffered *);
static Py_ssize_t
_bufferedreader_fill_buffer(buffered *self);
static void
@@ -618,6 +618,18 @@ _bufferedreader_raw_read(buffered *self, char *start, Py_ssize_t len);
* Helpers
*/
+/* Sets the current error to BlockingIOError */
+static void
+_set_BlockingIOError(char *msg, Py_ssize_t written)
+{
+ PyObject *err;
+ err = PyObject_CallFunction(PyExc_BlockingIOError, "isn",
+ errno, msg, written);
+ if (err)
+ PyErr_SetObject(PyExc_BlockingIOError, err);
+ Py_XDECREF(err);
+}
+
/* Returns the address of the `written` member if a BlockingIOError was
raised, NULL otherwise. The error is always re-raised. */
static Py_ssize_t *
@@ -772,7 +784,7 @@ buffered_flush_and_rewind_unlocked(buffered *self)
{
PyObject *res;
- res = _bufferedwriter_flush_unlocked(self, 0);
+ res = _bufferedwriter_flush_unlocked(self);
if (res == NULL)
return NULL;
Py_DECREF(res);
@@ -1188,7 +1200,7 @@ buffered_seek(buffered *self, PyObject *args)
/* Fallback: invoke raw seek() method and clear buffer */
if (self->writable) {
- res = _bufferedwriter_flush_unlocked(self, 0);
+ res = _bufferedwriter_flush_unlocked(self);
if (res == NULL)
goto end;
Py_CLEAR(res);
@@ -1807,6 +1819,7 @@ _bufferedwriter_raw_write(buffered *self, char *start, Py_ssize_t len)
Py_buffer buf;
PyObject *memobj, *res;
Py_ssize_t n;
+ int errnum;
/* NOTE: the buffer needn't be released as its object is NULL. */
if (PyBuffer_FillInfo(&buf, NULL, start, len, 1, PyBUF_CONTIG_RO) == -1)
return -1;
@@ -1819,11 +1832,21 @@ _bufferedwriter_raw_write(buffered *self, char *start, Py_ssize_t len)
raised (see issue #10956).
*/
do {
+ errno = 0;
res = PyObject_CallMethodObjArgs(self->raw, _PyIO_str_write, memobj, NULL);
+ errnum = errno;
} while (res == NULL && _trap_eintr());
Py_DECREF(memobj);
if (res == NULL)
return -1;
+ if (res == Py_None) {
+ /* Non-blocking stream would have blocked. Special return code!
+ Being paranoid we reset errno in case it is changed by code
+ triggered by a decref. errno is used by _set_BlockingIOError(). */
+ Py_DECREF(res);
+ errno = errnum;
+ return -2;
+ }
n = PyNumber_AsSsize_t(res, PyExc_ValueError);
Py_DECREF(res);
if (n < 0 || n > len) {
@@ -1840,7 +1863,7 @@ _bufferedwriter_raw_write(buffered *self, char *start, Py_ssize_t len)
/* `restore_pos` is 1 if we need to restore the raw stream position at
the end, 0 otherwise. */
static PyObject *
-_bufferedwriter_flush_unlocked(buffered *self, int restore_pos)
+_bufferedwriter_flush_unlocked(buffered *self)
{
Py_ssize_t written = 0;
Py_off_t n, rewind;
@@ -1862,14 +1885,11 @@ _bufferedwriter_flush_unlocked(buffered *self, int restore_pos)
Py_SAFE_DOWNCAST(self->write_end - self->write_pos,
Py_off_t, Py_ssize_t));
if (n == -1) {
- Py_ssize_t *w = _buffered_check_blocking_error();
- if (w == NULL)
- goto error;
- self->write_pos += *w;
- self->raw_pos = self->write_pos;
- written += *w;
- *w = written;
- /* Already re-raised */
+ goto error;
+ }
+ else if (n == -2) {
+ _set_BlockingIOError("write could not complete without blocking",
+ 0);
goto error;
}
self->write_pos += n;
@@ -1882,16 +1902,6 @@ _bufferedwriter_flush_unlocked(buffered *self, int restore_pos)
goto error;
}
- if (restore_pos) {
- Py_off_t forward = rewind - written;
- if (forward != 0) {
- n = _buffered_raw_seek(self, forward, 1);
- if (n < 0) {
- goto error;
- }
- self->raw_pos += forward;
- }
- }
_bufferedwriter_reset_buf(self);
end:
@@ -1944,7 +1954,7 @@ bufferedwriter_write(buffered *self, PyObject *args)
}
/* First write the current buffer */
- res = _bufferedwriter_flush_unlocked(self, 0);
+ res = _bufferedwriter_flush_unlocked(self);
if (res == NULL) {
Py_ssize_t *w = _buffered_check_blocking_error();
if (w == NULL)
@@ -1967,14 +1977,19 @@ bufferedwriter_write(buffered *self, PyObject *args)
PyErr_Clear();
memcpy(self->buffer + self->write_end, buf.buf, buf.len);
self->write_end += buf.len;
+ self->pos += buf.len;
written = buf.len;
goto end;
}
/* Buffer as much as possible. */
memcpy(self->buffer + self->write_end, buf.buf, avail);
self->write_end += avail;
- /* Already re-raised */
- *w = avail;
+ self->pos += avail;
+ /* XXX Modifying the existing exception e using the pointer w
+ will change e.characters_written but not e.args[2].
+ Therefore we just replace with a new error. */
+ _set_BlockingIOError("write could not complete without blocking",
+ avail);
goto error;
}
Py_CLEAR(res);
@@ -1999,11 +2014,9 @@ bufferedwriter_write(buffered *self, PyObject *args)
Py_ssize_t n = _bufferedwriter_raw_write(
self, (char *) buf.buf + written, buf.len - written);
if (n == -1) {
- Py_ssize_t *w = _buffered_check_blocking_error();
- if (w == NULL)
- goto error;
- written += *w;
- remaining -= *w;
+ goto error;
+ } else if (n == -2) {
+ /* Write failed because raw file is non-blocking */
if (remaining > self->buffer_size) {
/* Can't buffer everything, still buffer as much as possible */
memcpy(self->buffer,
@@ -2011,8 +2024,9 @@ bufferedwriter_write(buffered *self, PyObject *args)
self->raw_pos = 0;
ADJUST_POSITION(self, self->buffer_size);
self->write_end = self->buffer_size;
- *w = written + self->buffer_size;
- /* Already re-raised */
+ written += self->buffer_size;
+ _set_BlockingIOError("write could not complete without "
+ "blocking", written);
goto error;
}
PyErr_Clear();