summaryrefslogtreecommitdiffstats
path: root/Lib
diff options
context:
space:
mode:
authorAndrey Egorov <andr06@gmail.com>2017-11-14 09:18:59 (GMT)
committerAndrew Svetlov <andrew.svetlov@gmail.com>2017-11-14 09:18:59 (GMT)
commite1d62e0b7cc842d6b75b4d480391f4a94e503255 (patch)
tree643f2a73d4675de5a0f21edd1c517ebc9db0f14f /Lib
parent56935a53b11b9a70f3e13e460777ec81a5b9195e (diff)
downloadcpython-e1d62e0b7cc842d6b75b4d480391f4a94e503255.zip
cpython-e1d62e0b7cc842d6b75b4d480391f4a94e503255.tar.gz
cpython-e1d62e0b7cc842d6b75b4d480391f4a94e503255.tar.bz2
bpo-32015: Asyncio looping during simultaneously socket read/write an… (#4386)
* bpo-32015: Asyncio cycling during simultaneously socket read/write and reconnection * Tests fix * Tests fix * News add * Add new unit tests
Diffstat (limited to 'Lib')
-rw-r--r--Lib/asyncio/selector_events.py37
-rw-r--r--Lib/test/test_asyncio/test_selector_events.py78
2 files changed, 77 insertions, 38 deletions
diff --git a/Lib/asyncio/selector_events.py b/Lib/asyncio/selector_events.py
index 00d9a7e..f3b278c 100644
--- a/Lib/asyncio/selector_events.py
+++ b/Lib/asyncio/selector_events.py
@@ -370,25 +370,25 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop):
if self._debug and sock.gettimeout() != 0:
raise ValueError("the socket must be non-blocking")
fut = self.create_future()
- self._sock_recv(fut, False, sock, n)
+ self._sock_recv(fut, None, sock, n)
return fut
- def _sock_recv(self, fut, registered, sock, n):
+ def _sock_recv(self, fut, registered_fd, sock, n):
# _sock_recv() can add itself as an I/O callback if the operation can't
# be done immediately. Don't use it directly, call sock_recv().
- fd = sock.fileno()
- if registered:
+ if registered_fd is not None:
# Remove the callback early. It should be rare that the
# selector says the fd is ready but the call still returns
# EAGAIN, and I am willing to take a hit in that case in
# order to simplify the common case.
- self.remove_reader(fd)
+ self.remove_reader(registered_fd)
if fut.cancelled():
return
try:
data = sock.recv(n)
except (BlockingIOError, InterruptedError):
- self.add_reader(fd, self._sock_recv, fut, True, sock, n)
+ fd = sock.fileno()
+ self.add_reader(fd, self._sock_recv, fut, fd, sock, n)
except Exception as exc:
fut.set_exception(exc)
else:
@@ -405,25 +405,25 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop):
if self._debug and sock.gettimeout() != 0:
raise ValueError("the socket must be non-blocking")
fut = self.create_future()
- self._sock_recv_into(fut, False, sock, buf)
+ self._sock_recv_into(fut, None, sock, buf)
return fut
- def _sock_recv_into(self, fut, registered, sock, buf):
+ def _sock_recv_into(self, fut, registered_fd, sock, buf):
# _sock_recv_into() can add itself as an I/O callback if the operation
# can't be done immediately. Don't use it directly, call sock_recv_into().
- fd = sock.fileno()
- if registered:
+ if registered_fd is not None:
# Remove the callback early. It should be rare that the
# selector says the fd is ready but the call still returns
# EAGAIN, and I am willing to take a hit in that case in
# order to simplify the common case.
- self.remove_reader(fd)
+ self.remove_reader(registered_fd)
if fut.cancelled():
return
try:
nbytes = sock.recv_into(buf)
except (BlockingIOError, InterruptedError):
- self.add_reader(fd, self._sock_recv_into, fut, True, sock, buf)
+ fd = sock.fileno()
+ self.add_reader(fd, self._sock_recv_into, fut, fd, sock, buf)
except Exception as exc:
fut.set_exception(exc)
else:
@@ -444,16 +444,14 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop):
raise ValueError("the socket must be non-blocking")
fut = self.create_future()
if data:
- self._sock_sendall(fut, False, sock, data)
+ self._sock_sendall(fut, None, sock, data)
else:
fut.set_result(None)
return fut
- def _sock_sendall(self, fut, registered, sock, data):
- fd = sock.fileno()
-
- if registered:
- self.remove_writer(fd)
+ def _sock_sendall(self, fut, registered_fd, sock, data):
+ if registered_fd is not None:
+ self.remove_writer(registered_fd)
if fut.cancelled():
return
@@ -470,7 +468,8 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop):
else:
if n:
data = data[n:]
- self.add_writer(fd, self._sock_sendall, fut, True, sock, data)
+ fd = sock.fileno()
+ self.add_writer(fd, self._sock_sendall, fut, fd, sock, data)
@coroutine
def sock_connect(self, sock, address):
diff --git a/Lib/test/test_asyncio/test_selector_events.py b/Lib/test/test_asyncio/test_selector_events.py
index c50b3e4..a3d118e 100644
--- a/Lib/test/test_asyncio/test_selector_events.py
+++ b/Lib/test/test_asyncio/test_selector_events.py
@@ -182,7 +182,27 @@ class BaseSelectorEventLoopTests(test_utils.TestCase):
f = self.loop.sock_recv(sock, 1024)
self.assertIsInstance(f, asyncio.Future)
- self.loop._sock_recv.assert_called_with(f, False, sock, 1024)
+ self.loop._sock_recv.assert_called_with(f, None, sock, 1024)
+
+ def test_sock_recv_reconnection(self):
+ sock = mock.Mock()
+ sock.fileno.return_value = 10
+ sock.recv.side_effect = BlockingIOError
+
+ self.loop.add_reader = mock.Mock()
+ self.loop.remove_reader = mock.Mock()
+ fut = self.loop.sock_recv(sock, 1024)
+ callback = self.loop.add_reader.call_args[0][1]
+ params = self.loop.add_reader.call_args[0][2:]
+
+ # emulate the old socket has closed, but the new one has
+ # the same fileno, so callback is called with old (closed) socket
+ sock.fileno.return_value = -1
+ sock.recv.side_effect = OSError(9)
+ callback(*params)
+
+ self.assertIsInstance(fut.exception(), OSError)
+ self.assertEqual((10,), self.loop.remove_reader.call_args[0])
def test__sock_recv_canceled_fut(self):
sock = mock.Mock()
@@ -190,7 +210,7 @@ class BaseSelectorEventLoopTests(test_utils.TestCase):
f = asyncio.Future(loop=self.loop)
f.cancel()
- self.loop._sock_recv(f, False, sock, 1024)
+ self.loop._sock_recv(f, None, sock, 1024)
self.assertFalse(sock.recv.called)
def test__sock_recv_unregister(self):
@@ -201,7 +221,7 @@ class BaseSelectorEventLoopTests(test_utils.TestCase):
f.cancel()
self.loop.remove_reader = mock.Mock()
- self.loop._sock_recv(f, True, sock, 1024)
+ self.loop._sock_recv(f, 10, sock, 1024)
self.assertEqual((10,), self.loop.remove_reader.call_args[0])
def test__sock_recv_tryagain(self):
@@ -211,8 +231,8 @@ class BaseSelectorEventLoopTests(test_utils.TestCase):
sock.recv.side_effect = BlockingIOError
self.loop.add_reader = mock.Mock()
- self.loop._sock_recv(f, False, sock, 1024)
- self.assertEqual((10, self.loop._sock_recv, f, True, sock, 1024),
+ self.loop._sock_recv(f, None, sock, 1024)
+ self.assertEqual((10, self.loop._sock_recv, f, 10, sock, 1024),
self.loop.add_reader.call_args[0])
def test__sock_recv_exception(self):
@@ -221,7 +241,7 @@ class BaseSelectorEventLoopTests(test_utils.TestCase):
sock.fileno.return_value = 10
err = sock.recv.side_effect = OSError()
- self.loop._sock_recv(f, False, sock, 1024)
+ self.loop._sock_recv(f, None, sock, 1024)
self.assertIs(err, f.exception())
def test_sock_sendall(self):
@@ -231,7 +251,7 @@ class BaseSelectorEventLoopTests(test_utils.TestCase):
f = self.loop.sock_sendall(sock, b'data')
self.assertIsInstance(f, asyncio.Future)
self.assertEqual(
- (f, False, sock, b'data'),
+ (f, None, sock, b'data'),
self.loop._sock_sendall.call_args[0])
def test_sock_sendall_nodata(self):
@@ -244,13 +264,33 @@ class BaseSelectorEventLoopTests(test_utils.TestCase):
self.assertIsNone(f.result())
self.assertFalse(self.loop._sock_sendall.called)
+ def test_sock_sendall_reconnection(self):
+ sock = mock.Mock()
+ sock.fileno.return_value = 10
+ sock.send.side_effect = BlockingIOError
+
+ self.loop.add_writer = mock.Mock()
+ self.loop.remove_writer = mock.Mock()
+ fut = self.loop.sock_sendall(sock, b'data')
+ callback = self.loop.add_writer.call_args[0][1]
+ params = self.loop.add_writer.call_args[0][2:]
+
+ # emulate the old socket has closed, but the new one has
+ # the same fileno, so callback is called with old (closed) socket
+ sock.fileno.return_value = -1
+ sock.send.side_effect = OSError(9)
+ callback(*params)
+
+ self.assertIsInstance(fut.exception(), OSError)
+ self.assertEqual((10,), self.loop.remove_writer.call_args[0])
+
def test__sock_sendall_canceled_fut(self):
sock = mock.Mock()
f = asyncio.Future(loop=self.loop)
f.cancel()
- self.loop._sock_sendall(f, False, sock, b'data')
+ self.loop._sock_sendall(f, None, sock, b'data')
self.assertFalse(sock.send.called)
def test__sock_sendall_unregister(self):
@@ -261,7 +301,7 @@ class BaseSelectorEventLoopTests(test_utils.TestCase):
f.cancel()
self.loop.remove_writer = mock.Mock()
- self.loop._sock_sendall(f, True, sock, b'data')
+ self.loop._sock_sendall(f, 10, sock, b'data')
self.assertEqual((10,), self.loop.remove_writer.call_args[0])
def test__sock_sendall_tryagain(self):
@@ -271,9 +311,9 @@ class BaseSelectorEventLoopTests(test_utils.TestCase):
sock.send.side_effect = BlockingIOError
self.loop.add_writer = mock.Mock()
- self.loop._sock_sendall(f, False, sock, b'data')
+ self.loop._sock_sendall(f, None, sock, b'data')
self.assertEqual(
- (10, self.loop._sock_sendall, f, True, sock, b'data'),
+ (10, self.loop._sock_sendall, f, 10, sock, b'data'),
self.loop.add_writer.call_args[0])
def test__sock_sendall_interrupted(self):
@@ -283,9 +323,9 @@ class BaseSelectorEventLoopTests(test_utils.TestCase):
sock.send.side_effect = InterruptedError
self.loop.add_writer = mock.Mock()
- self.loop._sock_sendall(f, False, sock, b'data')
+ self.loop._sock_sendall(f, None, sock, b'data')
self.assertEqual(
- (10, self.loop._sock_sendall, f, True, sock, b'data'),
+ (10, self.loop._sock_sendall, f, 10, sock, b'data'),
self.loop.add_writer.call_args[0])
def test__sock_sendall_exception(self):
@@ -294,7 +334,7 @@ class BaseSelectorEventLoopTests(test_utils.TestCase):
sock.fileno.return_value = 10
err = sock.send.side_effect = OSError()
- self.loop._sock_sendall(f, False, sock, b'data')
+ self.loop._sock_sendall(f, None, sock, b'data')
self.assertIs(f.exception(), err)
def test__sock_sendall(self):
@@ -304,7 +344,7 @@ class BaseSelectorEventLoopTests(test_utils.TestCase):
sock.fileno.return_value = 10
sock.send.return_value = 4
- self.loop._sock_sendall(f, False, sock, b'data')
+ self.loop._sock_sendall(f, None, sock, b'data')
self.assertTrue(f.done())
self.assertIsNone(f.result())
@@ -316,10 +356,10 @@ class BaseSelectorEventLoopTests(test_utils.TestCase):
sock.send.return_value = 2
self.loop.add_writer = mock.Mock()
- self.loop._sock_sendall(f, False, sock, b'data')
+ self.loop._sock_sendall(f, None, sock, b'data')
self.assertFalse(f.done())
self.assertEqual(
- (10, self.loop._sock_sendall, f, True, sock, b'ta'),
+ (10, self.loop._sock_sendall, f, 10, sock, b'ta'),
self.loop.add_writer.call_args[0])
def test__sock_sendall_none(self):
@@ -330,10 +370,10 @@ class BaseSelectorEventLoopTests(test_utils.TestCase):
sock.send.return_value = 0
self.loop.add_writer = mock.Mock()
- self.loop._sock_sendall(f, False, sock, b'data')
+ self.loop._sock_sendall(f, None, sock, b'data')
self.assertFalse(f.done())
self.assertEqual(
- (10, self.loop._sock_sendall, f, True, sock, b'data'),
+ (10, self.loop._sock_sendall, f, 10, sock, b'data'),
self.loop.add_writer.call_args[0])
def test_sock_connect_timeout(self):