summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--Doc/library/asyncio-eventloop.rst15
-rw-r--r--Lib/asyncio/events.py3
-rw-r--r--Lib/asyncio/proactor_events.py3
-rw-r--r--Lib/asyncio/selector_events.py35
-rw-r--r--Lib/asyncio/windows_events.py22
-rw-r--r--Lib/test/test_asyncio/test_events.py26
-rw-r--r--Lib/test/test_asyncio/test_proactor_events.py5
-rw-r--r--Misc/NEWS.d/next/Library/2017-10-19-20-03-13.bpo-31819.mw2wF9.rst1
-rw-r--r--Modules/overlapped.c214
9 files changed, 266 insertions, 58 deletions
diff --git a/Doc/library/asyncio-eventloop.rst b/Doc/library/asyncio-eventloop.rst
index 83bbb70..ade3739 100644
--- a/Doc/library/asyncio-eventloop.rst
+++ b/Doc/library/asyncio-eventloop.rst
@@ -554,6 +554,21 @@ Low-level socket operations
This method is a :ref:`coroutine <coroutine>`.
+.. coroutinemethod:: AbstractEventLoop.sock_recv_into(sock, buf)
+
+ Receive data from the socket. Modeled after blocking
+ :meth:`socket.socket.recv_into` method.
+
+ The received data is written into *buf* (a writable buffer).
+ The return value is the number of bytes written.
+
+ With :class:`SelectorEventLoop` event loop, the socket *sock* must be
+ non-blocking.
+
+ This method is a :ref:`coroutine <coroutine>`.
+
+ .. versionadded:: 3.7
+
.. coroutinemethod:: AbstractEventLoop.sock_sendall(sock, data)
Send data to the socket. Modeled after blocking
diff --git a/Lib/asyncio/events.py b/Lib/asyncio/events.py
index 03af699..0dbd92c 100644
--- a/Lib/asyncio/events.py
+++ b/Lib/asyncio/events.py
@@ -461,6 +461,9 @@ class AbstractEventLoop:
def sock_recv(self, sock, nbytes):
raise NotImplementedError
+ def sock_recv_into(self, sock, buf):
+ raise NotImplementedError
+
def sock_sendall(self, sock, data):
raise NotImplementedError
diff --git a/Lib/asyncio/proactor_events.py b/Lib/asyncio/proactor_events.py
index 642f61e..5e7a397 100644
--- a/Lib/asyncio/proactor_events.py
+++ b/Lib/asyncio/proactor_events.py
@@ -439,6 +439,9 @@ class BaseProactorEventLoop(base_events.BaseEventLoop):
def sock_recv(self, sock, n):
return self._proactor.recv(sock, n)
+ def sock_recv_into(self, sock, buf):
+ return self._proactor.recv_into(sock, buf)
+
def sock_sendall(self, sock, data):
return self._proactor.send(sock, data)
diff --git a/Lib/asyncio/selector_events.py b/Lib/asyncio/selector_events.py
index 4b40356..7143ca2 100644
--- a/Lib/asyncio/selector_events.py
+++ b/Lib/asyncio/selector_events.py
@@ -386,6 +386,41 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop):
else:
fut.set_result(data)
+ def sock_recv_into(self, sock, buf):
+ """Receive data from the socket.
+
+ The received data is written into *buf* (a writable buffer).
+ The return value is the number of bytes written.
+
+ This method is a coroutine.
+ """
+ if self._debug and sock.gettimeout() != 0:
+ raise ValueError("the socket must be non-blocking")
+ fut = self.create_future()
+ self._sock_recv_into(fut, False, sock, buf)
+ return fut
+
+ def _sock_recv_into(self, fut, registered, sock, buf):
+ # _sock_recv_into() can add itself as an I/O callback if the operation
+ # can't be done immediately. Don't use it directly, call sock_recv_into().
+ fd = sock.fileno()
+ if registered:
+ # Remove the callback early. It should be rare that the
+ # selector says the fd is ready but the call still returns
+ # EAGAIN, and I am willing to take a hit in that case in
+ # order to simplify the common case.
+ self.remove_reader(fd)
+ if fut.cancelled():
+ return
+ try:
+ nbytes = sock.recv_into(buf)
+ except (BlockingIOError, InterruptedError):
+ self.add_reader(fd, self._sock_recv_into, fut, True, sock, buf)
+ except Exception as exc:
+ fut.set_exception(exc)
+ else:
+ fut.set_result(nbytes)
+
def sock_sendall(self, sock, data):
"""Send data to the socket.
diff --git a/Lib/asyncio/windows_events.py b/Lib/asyncio/windows_events.py
index b777dd0..6045ba0 100644
--- a/Lib/asyncio/windows_events.py
+++ b/Lib/asyncio/windows_events.py
@@ -448,6 +448,28 @@ class IocpProactor:
return self._register(ov, conn, finish_recv)
+ def recv_into(self, conn, buf, flags=0):
+ self._register_with_iocp(conn)
+ ov = _overlapped.Overlapped(NULL)
+ try:
+ if isinstance(conn, socket.socket):
+ ov.WSARecvInto(conn.fileno(), buf, flags)
+ else:
+ ov.ReadFileInto(conn.fileno(), buf)
+ except BrokenPipeError:
+ return self._result(b'')
+
+ def finish_recv(trans, key, ov):
+ try:
+ return ov.getresult()
+ except OSError as exc:
+ if exc.winerror == _overlapped.ERROR_NETNAME_DELETED:
+ raise ConnectionResetError(*exc.args)
+ else:
+ raise
+
+ return self._register(ov, conn, finish_recv)
+
def send(self, conn, buf, flags=0):
self._register_with_iocp(conn)
ov = _overlapped.Overlapped(NULL)
diff --git a/Lib/test/test_asyncio/test_events.py b/Lib/test/test_asyncio/test_events.py
index 736f703..0ea9c08 100644
--- a/Lib/test/test_asyncio/test_events.py
+++ b/Lib/test/test_asyncio/test_events.py
@@ -427,6 +427,9 @@ class EventLoopTestsMixin:
self.loop.sock_recv(sock, 1024))
with self.assertRaises(ValueError):
self.loop.run_until_complete(
+ self.loop.sock_recv_into(sock, bytearray()))
+ with self.assertRaises(ValueError):
+ self.loop.run_until_complete(
self.loop.sock_accept(sock))
# test in non-blocking mode
@@ -443,16 +446,37 @@ class EventLoopTestsMixin:
sock.close()
self.assertTrue(data.startswith(b'HTTP/1.0 200 OK'))
+ def _basetest_sock_recv_into(self, httpd, sock):
+ # same as _basetest_sock_client_ops, but using sock_recv_into
+ sock.setblocking(False)
+ self.loop.run_until_complete(
+ self.loop.sock_connect(sock, httpd.address))
+ self.loop.run_until_complete(
+ self.loop.sock_sendall(sock, b'GET / HTTP/1.0\r\n\r\n'))
+ data = bytearray(1024)
+ with memoryview(data) as buf:
+ nbytes = self.loop.run_until_complete(
+ self.loop.sock_recv_into(sock, buf[:1024]))
+ # consume data
+ self.loop.run_until_complete(
+ self.loop.sock_recv_into(sock, buf[nbytes:]))
+ sock.close()
+ self.assertTrue(data.startswith(b'HTTP/1.0 200 OK'))
+
def test_sock_client_ops(self):
with test_utils.run_test_server() as httpd:
sock = socket.socket()
self._basetest_sock_client_ops(httpd, sock)
+ sock = socket.socket()
+ self._basetest_sock_recv_into(httpd, sock)
@unittest.skipUnless(hasattr(socket, 'AF_UNIX'), 'No UNIX Sockets')
def test_unix_sock_client_ops(self):
with test_utils.run_test_unix_server() as httpd:
sock = socket.socket(socket.AF_UNIX)
self._basetest_sock_client_ops(httpd, sock)
+ sock = socket.socket(socket.AF_UNIX)
+ self._basetest_sock_recv_into(httpd, sock)
def test_sock_client_fail(self):
# Make sure that we will get an unused port
@@ -2613,6 +2637,8 @@ class AbstractEventLoopTests(unittest.TestCase):
self.assertRaises(
NotImplementedError, loop.sock_recv, f, 10)
self.assertRaises(
+ NotImplementedError, loop.sock_recv_into, f, 10)
+ self.assertRaises(
NotImplementedError, loop.sock_sendall, f, 10)
self.assertRaises(
NotImplementedError, loop.sock_connect, f, f)
diff --git a/Lib/test/test_asyncio/test_proactor_events.py b/Lib/test/test_asyncio/test_proactor_events.py
index d76da66..7a8b523 100644
--- a/Lib/test/test_asyncio/test_proactor_events.py
+++ b/Lib/test/test_asyncio/test_proactor_events.py
@@ -489,6 +489,11 @@ class BaseProactorEventLoopTests(test_utils.TestCase):
self.loop.sock_recv(self.sock, 1024)
self.proactor.recv.assert_called_with(self.sock, 1024)
+ def test_sock_recv_into(self):
+ buf = bytearray(10)
+ self.loop.sock_recv_into(self.sock, buf)
+ self.proactor.recv_into.assert_called_with(self.sock, buf)
+
def test_sock_sendall(self):
self.loop.sock_sendall(self.sock, b'data')
self.proactor.send.assert_called_with(self.sock, b'data')
diff --git a/Misc/NEWS.d/next/Library/2017-10-19-20-03-13.bpo-31819.mw2wF9.rst b/Misc/NEWS.d/next/Library/2017-10-19-20-03-13.bpo-31819.mw2wF9.rst
new file mode 100644
index 0000000..7bdb0d9
--- /dev/null
+++ b/Misc/NEWS.d/next/Library/2017-10-19-20-03-13.bpo-31819.mw2wF9.rst
@@ -0,0 +1 @@
+Add AbstractEventLoop.sock_recv_into().
diff --git a/Modules/overlapped.c b/Modules/overlapped.c
index 6099d46..9355ce6 100644
--- a/Modules/overlapped.c
+++ b/Modules/overlapped.c
@@ -37,8 +37,8 @@
#define T_HANDLE T_POINTER
-enum {TYPE_NONE, TYPE_NOT_STARTED, TYPE_READ, TYPE_WRITE, TYPE_ACCEPT,
- TYPE_CONNECT, TYPE_DISCONNECT, TYPE_CONNECT_NAMED_PIPE,
+enum {TYPE_NONE, TYPE_NOT_STARTED, TYPE_READ, TYPE_READINTO, TYPE_WRITE,
+ TYPE_ACCEPT, TYPE_CONNECT, TYPE_DISCONNECT, TYPE_CONNECT_NAMED_PIPE,
TYPE_WAIT_NAMED_PIPE_AND_CONNECT};
typedef struct {
@@ -51,10 +51,10 @@ typedef struct {
/* Type of operation */
DWORD type;
union {
- /* Buffer used for reading: TYPE_READ and TYPE_ACCEPT */
- PyObject *read_buffer;
- /* Buffer used for writing: TYPE_WRITE */
- Py_buffer write_buffer;
+ /* Buffer allocated by us: TYPE_READ and TYPE_ACCEPT */
+ PyObject *allocated_buffer;
+ /* Buffer passed by the user: TYPE_WRITE and TYPE_READINTO */
+ Py_buffer user_buffer;
};
} OverlappedObject;
@@ -550,9 +550,9 @@ Overlapped_new(PyTypeObject *type, PyObject *args, PyObject *kwds)
self->handle = NULL;
self->error = 0;
self->type = TYPE_NONE;
- self->read_buffer = NULL;
+ self->allocated_buffer = NULL;
memset(&self->overlapped, 0, sizeof(OVERLAPPED));
- memset(&self->write_buffer, 0, sizeof(Py_buffer));
+ memset(&self->user_buffer, 0, sizeof(Py_buffer));
if (event)
self->overlapped.hEvent = event;
return (PyObject *)self;
@@ -597,11 +597,12 @@ Overlapped_dealloc(OverlappedObject *self)
switch (self->type) {
case TYPE_READ:
case TYPE_ACCEPT:
- Py_CLEAR(self->read_buffer);
+ Py_CLEAR(self->allocated_buffer);
break;
case TYPE_WRITE:
- if (self->write_buffer.obj)
- PyBuffer_Release(&self->write_buffer);
+ case TYPE_READINTO:
+ if (self->user_buffer.obj)
+ PyBuffer_Release(&self->user_buffer);
break;
}
PyObject_Del(self);
@@ -676,7 +677,7 @@ Overlapped_getresult(OverlappedObject *self, PyObject *args)
case ERROR_MORE_DATA:
break;
case ERROR_BROKEN_PIPE:
- if ((self->type == TYPE_READ || self->type == TYPE_ACCEPT) && self->read_buffer != NULL)
+ if (self->type == TYPE_READ || self->type == TYPE_READINTO)
break;
/* fall through */
default:
@@ -685,17 +686,45 @@ Overlapped_getresult(OverlappedObject *self, PyObject *args)
switch (self->type) {
case TYPE_READ:
- assert(PyBytes_CheckExact(self->read_buffer));
- if (transferred != PyBytes_GET_SIZE(self->read_buffer) &&
- _PyBytes_Resize(&self->read_buffer, transferred))
+ assert(PyBytes_CheckExact(self->allocated_buffer));
+ if (transferred != PyBytes_GET_SIZE(self->allocated_buffer) &&
+ _PyBytes_Resize(&self->allocated_buffer, transferred))
return NULL;
- Py_INCREF(self->read_buffer);
- return self->read_buffer;
+ Py_INCREF(self->allocated_buffer);
+ return self->allocated_buffer;
default:
return PyLong_FromUnsignedLong((unsigned long) transferred);
}
}
+static PyObject *
+do_ReadFile(OverlappedObject *self, HANDLE handle,
+ char *bufstart, DWORD buflen)
+{
+ DWORD nread;
+ int ret;
+ DWORD err;
+
+ Py_BEGIN_ALLOW_THREADS
+ ret = ReadFile(handle, bufstart, buflen, &nread,
+ &self->overlapped);
+ Py_END_ALLOW_THREADS
+
+ self->error = err = ret ? ERROR_SUCCESS : GetLastError();
+ switch (err) {
+ case ERROR_BROKEN_PIPE:
+ mark_as_completed(&self->overlapped);
+ return SetFromWindowsErr(err);
+ case ERROR_SUCCESS:
+ case ERROR_MORE_DATA:
+ case ERROR_IO_PENDING:
+ Py_RETURN_NONE;
+ default:
+ self->type = TYPE_NOT_STARTED;
+ return SetFromWindowsErr(err);
+ }
+}
+
PyDoc_STRVAR(
Overlapped_ReadFile_doc,
"ReadFile(handle, size) -> Overlapped[message]\n\n"
@@ -706,10 +735,7 @@ Overlapped_ReadFile(OverlappedObject *self, PyObject *args)
{
HANDLE handle;
DWORD size;
- DWORD nread;
PyObject *buf;
- BOOL ret;
- DWORD err;
if (!PyArg_ParseTuple(args, F_HANDLE F_DWORD, &handle, &size))
return NULL;
@@ -728,14 +754,66 @@ Overlapped_ReadFile(OverlappedObject *self, PyObject *args)
self->type = TYPE_READ;
self->handle = handle;
- self->read_buffer = buf;
+ self->allocated_buffer = buf;
+
+ return do_ReadFile(self, handle, PyBytes_AS_STRING(buf), size);
+}
+
+PyDoc_STRVAR(
+ Overlapped_ReadFileInto_doc,
+ "ReadFileInto(handle, buf) -> Overlapped[bytes_transferred]\n\n"
+ "Start overlapped receive");
+
+static PyObject *
+Overlapped_ReadFileInto(OverlappedObject *self, PyObject *args)
+{
+ HANDLE handle;
+ PyObject *bufobj;
+
+ if (!PyArg_ParseTuple(args, F_HANDLE "O", &handle, &bufobj))
+ return NULL;
+
+ if (self->type != TYPE_NONE) {
+ PyErr_SetString(PyExc_ValueError, "operation already attempted");
+ return NULL;
+ }
+
+ if (!PyArg_Parse(bufobj, "y*", &self->user_buffer))
+ return NULL;
+
+#if SIZEOF_SIZE_T > SIZEOF_LONG
+ if (self->user_buffer.len > (Py_ssize_t)ULONG_MAX) {
+ PyBuffer_Release(&self->user_buffer);
+ PyErr_SetString(PyExc_ValueError, "buffer too large");
+ return NULL;
+ }
+#endif
+
+ self->type = TYPE_READINTO;
+ self->handle = handle;
+
+ return do_ReadFile(self, handle, self->user_buffer.buf,
+ (DWORD)self->user_buffer.len);
+}
+
+static PyObject *
+do_WSARecv(OverlappedObject *self, HANDLE handle,
+ char *bufstart, DWORD buflen, DWORD flags)
+{
+ DWORD nread;
+ WSABUF wsabuf;
+ int ret;
+ DWORD err;
+
+ wsabuf.buf = bufstart;
+ wsabuf.len = buflen;
Py_BEGIN_ALLOW_THREADS
- ret = ReadFile(handle, PyBytes_AS_STRING(buf), size, &nread,
- &self->overlapped);
+ ret = WSARecv((SOCKET)handle, &wsabuf, 1, &nread, &flags,
+ &self->overlapped, NULL);
Py_END_ALLOW_THREADS
- self->error = err = ret ? ERROR_SUCCESS : GetLastError();
+ self->error = err = (ret < 0 ? WSAGetLastError() : ERROR_SUCCESS);
switch (err) {
case ERROR_BROKEN_PIPE:
mark_as_completed(&self->overlapped);
@@ -761,11 +839,7 @@ Overlapped_WSARecv(OverlappedObject *self, PyObject *args)
HANDLE handle;
DWORD size;
DWORD flags = 0;
- DWORD nread;
PyObject *buf;
- WSABUF wsabuf;
- int ret;
- DWORD err;
if (!PyArg_ParseTuple(args, F_HANDLE F_DWORD "|" F_DWORD,
&handle, &size, &flags))
@@ -785,28 +859,48 @@ Overlapped_WSARecv(OverlappedObject *self, PyObject *args)
self->type = TYPE_READ;
self->handle = handle;
- self->read_buffer = buf;
- wsabuf.len = size;
- wsabuf.buf = PyBytes_AS_STRING(buf);
+ self->allocated_buffer = buf;
- Py_BEGIN_ALLOW_THREADS
- ret = WSARecv((SOCKET)handle, &wsabuf, 1, &nread, &flags,
- &self->overlapped, NULL);
- Py_END_ALLOW_THREADS
+ return do_WSARecv(self, handle, PyBytes_AS_STRING(buf), size, flags);
+}
- self->error = err = (ret < 0 ? WSAGetLastError() : ERROR_SUCCESS);
- switch (err) {
- case ERROR_BROKEN_PIPE:
- mark_as_completed(&self->overlapped);
- return SetFromWindowsErr(err);
- case ERROR_SUCCESS:
- case ERROR_MORE_DATA:
- case ERROR_IO_PENDING:
- Py_RETURN_NONE;
- default:
- self->type = TYPE_NOT_STARTED;
- return SetFromWindowsErr(err);
+PyDoc_STRVAR(
+ Overlapped_WSARecvInto_doc,
+ "WSARecvInto(handle, buf, flags) -> Overlapped[bytes_transferred]\n\n"
+ "Start overlapped receive");
+
+static PyObject *
+Overlapped_WSARecvInto(OverlappedObject *self, PyObject *args)
+{
+ HANDLE handle;
+ PyObject *bufobj;
+ DWORD flags;
+
+ if (!PyArg_ParseTuple(args, F_HANDLE "O" F_DWORD,
+ &handle, &bufobj, &flags))
+ return NULL;
+
+ if (self->type != TYPE_NONE) {
+ PyErr_SetString(PyExc_ValueError, "operation already attempted");
+ return NULL;
+ }
+
+ if (!PyArg_Parse(bufobj, "y*", &self->user_buffer))
+ return NULL;
+
+#if SIZEOF_SIZE_T > SIZEOF_LONG
+ if (self->user_buffer.len > (Py_ssize_t)ULONG_MAX) {
+ PyBuffer_Release(&self->user_buffer);
+ PyErr_SetString(PyExc_ValueError, "buffer too large");
+ return NULL;
}
+#endif
+
+ self->type = TYPE_READINTO;
+ self->handle = handle;
+
+ return do_WSARecv(self, handle, self->user_buffer.buf,
+ (DWORD)self->user_buffer.len, flags);
}
PyDoc_STRVAR(
@@ -831,12 +925,12 @@ Overlapped_WriteFile(OverlappedObject *self, PyObject *args)
return NULL;
}
- if (!PyArg_Parse(bufobj, "y*", &self->write_buffer))
+ if (!PyArg_Parse(bufobj, "y*", &self->user_buffer))
return NULL;
#if SIZEOF_SIZE_T > SIZEOF_LONG
- if (self->write_buffer.len > (Py_ssize_t)ULONG_MAX) {
- PyBuffer_Release(&self->write_buffer);
+ if (self->user_buffer.len > (Py_ssize_t)ULONG_MAX) {
+ PyBuffer_Release(&self->user_buffer);
PyErr_SetString(PyExc_ValueError, "buffer too large");
return NULL;
}
@@ -846,8 +940,8 @@ Overlapped_WriteFile(OverlappedObject *self, PyObject *args)
self->handle = handle;
Py_BEGIN_ALLOW_THREADS
- ret = WriteFile(handle, self->write_buffer.buf,
- (DWORD)self->write_buffer.len,
+ ret = WriteFile(handle, self->user_buffer.buf,
+ (DWORD)self->user_buffer.len,
&written, &self->overlapped);
Py_END_ALLOW_THREADS
@@ -887,12 +981,12 @@ Overlapped_WSASend(OverlappedObject *self, PyObject *args)
return NULL;
}
- if (!PyArg_Parse(bufobj, "y*", &self->write_buffer))
+ if (!PyArg_Parse(bufobj, "y*", &self->user_buffer))
return NULL;
#if SIZEOF_SIZE_T > SIZEOF_LONG
- if (self->write_buffer.len > (Py_ssize_t)ULONG_MAX) {
- PyBuffer_Release(&self->write_buffer);
+ if (self->user_buffer.len > (Py_ssize_t)ULONG_MAX) {
+ PyBuffer_Release(&self->user_buffer);
PyErr_SetString(PyExc_ValueError, "buffer too large");
return NULL;
}
@@ -900,8 +994,8 @@ Overlapped_WSASend(OverlappedObject *self, PyObject *args)
self->type = TYPE_WRITE;
self->handle = handle;
- wsabuf.len = (DWORD)self->write_buffer.len;
- wsabuf.buf = self->write_buffer.buf;
+ wsabuf.len = (DWORD)self->user_buffer.len;
+ wsabuf.buf = self->user_buffer.buf;
Py_BEGIN_ALLOW_THREADS
ret = WSASend((SOCKET)handle, &wsabuf, 1, &written, flags,
@@ -951,7 +1045,7 @@ Overlapped_AcceptEx(OverlappedObject *self, PyObject *args)
self->type = TYPE_ACCEPT;
self->handle = (HANDLE)ListenSocket;
- self->read_buffer = buf;
+ self->allocated_buffer = buf;
Py_BEGIN_ALLOW_THREADS
ret = Py_AcceptEx(ListenSocket, AcceptSocket, PyBytes_AS_STRING(buf),
@@ -1193,8 +1287,12 @@ static PyMethodDef Overlapped_methods[] = {
METH_NOARGS, Overlapped_cancel_doc},
{"ReadFile", (PyCFunction) Overlapped_ReadFile,
METH_VARARGS, Overlapped_ReadFile_doc},
+ {"ReadFileInto", (PyCFunction) Overlapped_ReadFileInto,
+ METH_VARARGS, Overlapped_ReadFileInto_doc},
{"WSARecv", (PyCFunction) Overlapped_WSARecv,
METH_VARARGS, Overlapped_WSARecv_doc},
+ {"WSARecvInto", (PyCFunction) Overlapped_WSARecvInto,
+ METH_VARARGS, Overlapped_WSARecvInto_doc},
{"WriteFile", (PyCFunction) Overlapped_WriteFile,
METH_VARARGS, Overlapped_WriteFile_doc},
{"WSASend", (PyCFunction) Overlapped_WSASend,