From 47bbea712415e79ee224d21e518470ec70477d41 Mon Sep 17 00:00:00 2001 From: Victor Stinner Date: Thu, 29 Jan 2015 02:56:05 +0100 Subject: asyncio: sync with Tulip * _SelectorTransport constructor: extra parameter is now optional * Fix _SelectorDatagramTransport constructor. Only start reading after connection_made() has been called. * Fix _SelectorSslTransport.close(). Don't call protocol.connection_lost() if protocol.connection_made() was not called yet: if the SSL handshake failed or is still in progress. The close() method can be called if the creation of the connection is cancelled, by a timeout for example. --- Lib/asyncio/selector_events.py | 13 ++++++++++--- Lib/test/test_asyncio/test_selector_events.py | 15 ++++++++++++++- 2 files changed, 24 insertions(+), 4 deletions(-) diff --git a/Lib/asyncio/selector_events.py b/Lib/asyncio/selector_events.py index f499629..3195f62 100644 --- a/Lib/asyncio/selector_events.py +++ b/Lib/asyncio/selector_events.py @@ -467,7 +467,7 @@ class _SelectorTransport(transports._FlowControlMixin, _buffer_factory = bytearray # Constructs initial value for self._buffer. - def __init__(self, loop, sock, protocol, extra, server=None): + def __init__(self, loop, sock, protocol, extra=None, server=None): super().__init__(extra, loop) self._extra['socket'] = sock self._extra['sockname'] = sock.getsockname() @@ -479,6 +479,7 @@ class _SelectorTransport(transports._FlowControlMixin, self._sock = sock self._sock_fd = sock.fileno() self._protocol = protocol + self._protocol_connected = True self._server = server self._buffer = self._buffer_factory() self._conn_lost = 0 # Set when call to connection_lost scheduled. @@ -555,7 +556,8 @@ class _SelectorTransport(transports._FlowControlMixin, def _call_connection_lost(self, exc): try: - self._protocol.connection_lost(exc) + if self._protocol_connected: + self._protocol.connection_lost(exc) finally: self._sock.close() self._sock = None @@ -718,6 +720,8 @@ class _SelectorSslTransport(_SelectorTransport): sslsock = sslcontext.wrap_socket(rawsock, **wrap_kwargs) super().__init__(loop, sslsock, protocol, extra, server) + # the protocol connection is only made after the SSL handshake + self._protocol_connected = False self._server_hostname = server_hostname self._waiter = waiter @@ -797,6 +801,7 @@ class _SelectorSslTransport(_SelectorTransport): self._read_wants_write = False self._write_wants_read = False self._loop.add_reader(self._sock_fd, self._read_ready) + self._protocol_connected = True self._loop.call_soon(self._protocol.connection_made, self) # only wake up the waiter when connection_made() has been called self._loop.call_soon(self._wakeup_waiter) @@ -928,8 +933,10 @@ class _SelectorDatagramTransport(_SelectorTransport): waiter=None, extra=None): super().__init__(loop, sock, protocol, extra) self._address = address - self._loop.add_reader(self._sock_fd, self._read_ready) self._loop.call_soon(self._protocol.connection_made, self) + # only start reading when connection_made() has been called + self._loop.call_soon(self._loop.add_reader, + self._sock_fd, self._read_ready) if waiter is not None: # only wake up the waiter when connection_made() has been called self._loop.call_soon(waiter._set_result_unless_cancelled, None) diff --git a/Lib/test/test_asyncio/test_selector_events.py b/Lib/test/test_asyncio/test_selector_events.py index 5152616..f64e40d 100644 --- a/Lib/test/test_asyncio/test_selector_events.py +++ b/Lib/test/test_asyncio/test_selector_events.py @@ -1427,7 +1427,7 @@ class SelectorSslTransportTests(test_utils.TestCase): self.assertFalse(tr.can_write_eof()) self.assertRaises(NotImplementedError, tr.write_eof) - def test_close(self): + def check_close(self): tr = self._make_one() tr.close() @@ -1439,6 +1439,19 @@ class SelectorSslTransportTests(test_utils.TestCase): self.assertEqual(tr._conn_lost, 1) self.assertEqual(1, self.loop.remove_reader_count[1]) + test_utils.run_briefly(self.loop) + + def test_close(self): + self.check_close() + self.assertTrue(self.protocol.connection_made.called) + self.assertTrue(self.protocol.connection_lost.called) + + def test_close_not_connected(self): + self.sslsock.do_handshake.side_effect = ssl.SSLWantReadError + self.check_close() + self.assertFalse(self.protocol.connection_made.called) + self.assertFalse(self.protocol.connection_lost.called) + @unittest.skipIf(ssl is None, 'No SSL support') def test_server_hostname(self): self.ssl_transport(server_hostname='localhost') -- cgit v0.12 From 54a231d5397bda24257f253eb1aaabf1b741a0b5 Mon Sep 17 00:00:00 2001 From: Victor Stinner Date: Thu, 29 Jan 2015 13:33:15 +0100 Subject: asyncio doc: document Protocol state machine --- Doc/library/asyncio-protocol.rst | 8 ++++++++ Lib/asyncio/protocols.py | 5 +++++ 2 files changed, 13 insertions(+) diff --git a/Doc/library/asyncio-protocol.rst b/Doc/library/asyncio-protocol.rst index 60776d1..b6fcc48 100644 --- a/Doc/library/asyncio-protocol.rst +++ b/Doc/library/asyncio-protocol.rst @@ -374,6 +374,14 @@ The following callbacks are called on :class:`Protocol` instances: a connection. However, :meth:`eof_received` is called at most once and, if called, :meth:`data_received` won't be called after it. +State machine: + + start -> :meth:`~BaseProtocol.connection_made` + [-> :meth:`~Protocol.data_received` \*] + [-> :meth:`~Protocol.eof_received` ?] + -> :meth:`~BaseProtocol.connection_lost` -> end + + Datagram protocols ------------------ diff --git a/Lib/asyncio/protocols.py b/Lib/asyncio/protocols.py index 52fc25c..80fcac9 100644 --- a/Lib/asyncio/protocols.py +++ b/Lib/asyncio/protocols.py @@ -78,6 +78,11 @@ class Protocol(BaseProtocol): State machine of calls: start -> CM [-> DR*] [-> ER?] -> CL -> end + + * CM: connection_made() + * DR: data_received() + * ER: eof_received() + * CL: connection_lost() """ def data_received(self, data): -- cgit v0.12 From 2934262fd36c35843c01b96657047625ce2e3cf6 Mon Sep 17 00:00:00 2001 From: Victor Stinner Date: Thu, 29 Jan 2015 14:15:19 +0100 Subject: asyncio: sync with Tulip * Cleanup gather(): use cancelled() method instead of using private Future attribute * Fix _UnixReadPipeTransport and _UnixWritePipeTransport. Only start reading when connection_made() has been called. * Issue #23333: Fix BaseSelectorEventLoop._accept_connection(). Close the transport on error. In debug mode, log errors using call_exception_handler() --- Lib/asyncio/selector_events.py | 44 ++++++++++++++++++++++++++----- Lib/asyncio/tasks.py | 2 +- Lib/asyncio/unix_events.py | 17 +++++++----- Lib/test/test_asyncio/test_events.py | 37 +++++++++++++++++--------- Lib/test/test_asyncio/test_unix_events.py | 29 ++++++++------------ 5 files changed, 85 insertions(+), 44 deletions(-) diff --git a/Lib/asyncio/selector_events.py b/Lib/asyncio/selector_events.py index 3195f62..9147832 100644 --- a/Lib/asyncio/selector_events.py +++ b/Lib/asyncio/selector_events.py @@ -22,6 +22,7 @@ from . import futures from . import selectors from . import transports from . import sslproto +from .coroutines import coroutine from .log import logger @@ -181,16 +182,47 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop): else: raise # The event loop will catch, log and ignore it. else: + extra = {'peername': addr} + accept = self._accept_connection2(protocol_factory, conn, extra, + sslcontext, server) + self.create_task(accept) + + @coroutine + def _accept_connection2(self, protocol_factory, conn, extra, + sslcontext=None, server=None): + protocol = None + transport = None + try: protocol = protocol_factory() + waiter = futures.Future(loop=self) if sslcontext: - self._make_ssl_transport( - conn, protocol, sslcontext, - server_side=True, extra={'peername': addr}, server=server) + transport = self._make_ssl_transport( + conn, protocol, sslcontext, waiter=waiter, + server_side=True, extra=extra, server=server) else: - self._make_socket_transport( - conn, protocol , extra={'peername': addr}, + transport = self._make_socket_transport( + conn, protocol, waiter=waiter, extra=extra, server=server) - # It's now up to the protocol to handle the connection. + + try: + yield from waiter + except: + transport.close() + raise + + # It's now up to the protocol to handle the connection. + except Exception as exc: + if self.get_debug(): + context = { + 'message': ('Error on transport creation ' + 'for incoming connection'), + 'exception': exc, + } + if protocol is not None: + context['protocol'] = protocol + if transport is not None: + context['transport'] = transport + self.call_exception_handler(context) def add_reader(self, fd, callback, *args): """Add a reader callback.""" diff --git a/Lib/asyncio/tasks.py b/Lib/asyncio/tasks.py index 63412a9..4f19a25 100644 --- a/Lib/asyncio/tasks.py +++ b/Lib/asyncio/tasks.py @@ -592,7 +592,7 @@ def gather(*coros_or_futures, loop=None, return_exceptions=False): fut.exception() return - if fut._state == futures._CANCELLED: + if fut.cancelled(): res = futures.CancelledError() if not return_exceptions: outer.set_exception(res) diff --git a/Lib/asyncio/unix_events.py b/Lib/asyncio/unix_events.py index 67973f1..7e1265a 100644 --- a/Lib/asyncio/unix_events.py +++ b/Lib/asyncio/unix_events.py @@ -298,8 +298,10 @@ class _UnixReadPipeTransport(transports.ReadTransport): _set_nonblocking(self._fileno) self._protocol = protocol self._closing = False - self._loop.add_reader(self._fileno, self._read_ready) self._loop.call_soon(self._protocol.connection_made, self) + # only start reading when connection_made() has been called + self._loop.call_soon(self._loop.add_reader, + self._fileno, self._read_ready) if waiter is not None: # only wake up the waiter when connection_made() has been called self._loop.call_soon(waiter._set_result_unless_cancelled, None) @@ -401,13 +403,16 @@ class _UnixWritePipeTransport(transports._FlowControlMixin, self._conn_lost = 0 self._closing = False # Set when close() or write_eof() called. - # On AIX, the reader trick only works for sockets. - # On other platforms it works for pipes and sockets. - # (Exception: OS X 10.4? Issue #19294.) + self._loop.call_soon(self._protocol.connection_made, self) + + # On AIX, the reader trick (to be notified when the read end of the + # socket is closed) only works for sockets. On other platforms it + # works for pipes and sockets. (Exception: OS X 10.4? Issue #19294.) if is_socket or not sys.platform.startswith("aix"): - self._loop.add_reader(self._fileno, self._read_ready) + # only start reading when connection_made() has been called + self._loop.call_soon(self._loop.add_reader, + self._fileno, self._read_ready) - self._loop.call_soon(self._protocol.connection_made, self) if waiter is not None: # only wake up the waiter when connection_made() has been called self._loop.call_soon(waiter._set_result_unless_cancelled, None) diff --git a/Lib/test/test_asyncio/test_events.py b/Lib/test/test_asyncio/test_events.py index a38c90e..12af62b 100644 --- a/Lib/test/test_asyncio/test_events.py +++ b/Lib/test/test_asyncio/test_events.py @@ -886,13 +886,18 @@ class EventLoopTestsMixin: if hasattr(sslcontext_client, 'check_hostname'): sslcontext_client.check_hostname = True + # no CA loaded f_c = self.loop.create_connection(MyProto, host, port, ssl=sslcontext_client) - with test_utils.disable_logger(): - with self.assertRaisesRegex(ssl.SSLError, - 'certificate verify failed '): - self.loop.run_until_complete(f_c) + with mock.patch.object(self.loop, 'call_exception_handler'): + with test_utils.disable_logger(): + with self.assertRaisesRegex(ssl.SSLError, + 'certificate verify failed '): + self.loop.run_until_complete(f_c) + + # execute the loop to log the connection error + test_utils.run_briefly(self.loop) # close connection self.assertIsNone(proto.transport) @@ -919,15 +924,20 @@ class EventLoopTestsMixin: f_c = self.loop.create_unix_connection(MyProto, path, ssl=sslcontext_client, server_hostname='invalid') - with test_utils.disable_logger(): - with self.assertRaisesRegex(ssl.SSLError, - 'certificate verify failed '): - self.loop.run_until_complete(f_c) + with mock.patch.object(self.loop, 'call_exception_handler'): + with test_utils.disable_logger(): + with self.assertRaisesRegex(ssl.SSLError, + 'certificate verify failed '): + self.loop.run_until_complete(f_c) + + # execute the loop to log the connection error + test_utils.run_briefly(self.loop) # close connection self.assertIsNone(proto.transport) server.close() + def test_legacy_create_unix_server_ssl_verify_failed(self): with test_utils.force_legacy_ssl_support(): self.test_create_unix_server_ssl_verify_failed() @@ -949,11 +959,12 @@ class EventLoopTestsMixin: # incorrect server_hostname f_c = self.loop.create_connection(MyProto, host, port, ssl=sslcontext_client) - with test_utils.disable_logger(): - with self.assertRaisesRegex( - ssl.CertificateError, - "hostname '127.0.0.1' doesn't match 'localhost'"): - self.loop.run_until_complete(f_c) + with mock.patch.object(self.loop, 'call_exception_handler'): + with test_utils.disable_logger(): + with self.assertRaisesRegex( + ssl.CertificateError, + "hostname '127.0.0.1' doesn't match 'localhost'"): + self.loop.run_until_complete(f_c) # close connection proto.transport.close() diff --git a/Lib/test/test_asyncio/test_unix_events.py b/Lib/test/test_asyncio/test_unix_events.py index 126196d..41249ff 100644 --- a/Lib/test/test_asyncio/test_unix_events.py +++ b/Lib/test/test_asyncio/test_unix_events.py @@ -350,16 +350,13 @@ class UnixReadPipeTransportTests(test_utils.TestCase): return transport def test_ctor(self): - tr = self.read_pipe_transport() - self.loop.assert_reader(5, tr._read_ready) - test_utils.run_briefly(self.loop) - self.protocol.connection_made.assert_called_with(tr) + waiter = asyncio.Future(loop=self.loop) + tr = self.read_pipe_transport(waiter=waiter) + self.loop.run_until_complete(waiter) - def test_ctor_with_waiter(self): - fut = asyncio.Future(loop=self.loop) - tr = self.read_pipe_transport(waiter=fut) - test_utils.run_briefly(self.loop) - self.assertIsNone(fut.result()) + self.protocol.connection_made.assert_called_with(tr) + self.loop.assert_reader(5, tr._read_ready) + self.assertIsNone(waiter.result()) @mock.patch('os.read') def test__read_ready(self, m_read): @@ -502,17 +499,13 @@ class UnixWritePipeTransportTests(test_utils.TestCase): return transport def test_ctor(self): - tr = self.write_pipe_transport() - self.loop.assert_reader(5, tr._read_ready) - test_utils.run_briefly(self.loop) - self.protocol.connection_made.assert_called_with(tr) + waiter = asyncio.Future(loop=self.loop) + tr = self.write_pipe_transport(waiter=waiter) + self.loop.run_until_complete(waiter) - def test_ctor_with_waiter(self): - fut = asyncio.Future(loop=self.loop) - tr = self.write_pipe_transport(waiter=fut) + self.protocol.connection_made.assert_called_with(tr) self.loop.assert_reader(5, tr._read_ready) - test_utils.run_briefly(self.loop) - self.assertEqual(None, fut.result()) + self.assertEqual(None, waiter.result()) def test_can_write_eof(self): tr = self.write_pipe_transport() -- cgit v0.12 From fa5d6a5ff3ca247d9c2eaf51853ff39c98c09f4a Mon Sep 17 00:00:00 2001 From: Stefan Krah Date: Thu, 29 Jan 2015 14:27:23 +0100 Subject: Issue #22668: Ensure that format strings survive slicing after casting. --- Include/memoryobject.h | 4 ++-- Lib/test/test_memoryview.py | 19 +++++++++++++++ Objects/memoryobject.c | 56 +++++++++++++++++++++++++++++++++++++++++---- 3 files changed, 73 insertions(+), 6 deletions(-) diff --git a/Include/memoryobject.h b/Include/memoryobject.h index c2e1194..382ca92 100644 --- a/Include/memoryobject.h +++ b/Include/memoryobject.h @@ -45,7 +45,7 @@ typedef struct { } _PyManagedBufferObject; -/* static storage used for casting between formats */ +/* deprecated, removed in 3.5 */ #define _Py_MEMORYVIEW_MAX_FORMAT 3 /* must be >= 3 */ /* memoryview state flags */ @@ -62,7 +62,7 @@ typedef struct { int flags; /* state flags */ Py_ssize_t exports; /* number of buffer re-exports */ Py_buffer view; /* private copy of the exporter's view */ - char format[_Py_MEMORYVIEW_MAX_FORMAT]; /* used for casting */ + char format[_Py_MEMORYVIEW_MAX_FORMAT]; /* deprecated, removed in 3.5 */ PyObject *weakreflist; Py_ssize_t ob_array[1]; /* shape, strides, suboffsets */ } PyMemoryViewObject; diff --git a/Lib/test/test_memoryview.py b/Lib/test/test_memoryview.py index e7df8a7..bd9d0d4 100644 --- a/Lib/test/test_memoryview.py +++ b/Lib/test/test_memoryview.py @@ -360,6 +360,25 @@ class AbstractMemoryTests: self.assertEqual(list(reversed(m)), aslist) self.assertEqual(list(reversed(m)), list(m[::-1])) + def test_issue22668(self): + m = memoryview(bytes(range(8))) + b = m.cast('H') + c = b[0:2] + d = memoryview(b) + + del b + + self.assertEqual(c[0], 256) + self.assertEqual(d[0], 256) + self.assertEqual(c.format, "H") + self.assertEqual(d.format, "H") + + _ = m.cast('I') + self.assertEqual(c[0], 256) + self.assertEqual(d[0], 256) + self.assertEqual(c.format, "H") + self.assertEqual(d.format, "H") + # Variations on source objects for the buffer: bytes-like objects, then arrays # with itemsize > 1. diff --git a/Objects/memoryobject.c b/Objects/memoryobject.c index cb644b8..0be8493 100644 --- a/Objects/memoryobject.c +++ b/Objects/memoryobject.c @@ -1135,6 +1135,51 @@ get_native_fmtchar(char *result, const char *fmt) return -1; } +Py_LOCAL_INLINE(char *) +get_native_fmtstr(const char *fmt) +{ + int at = 0; + + if (fmt[0] == '@') { + at = 1; + fmt++; + } + if (fmt[0] == '\0' || fmt[1] != '\0') { + return NULL; + } + +#define RETURN(s) do { return at ? "@" s : s; } while (0) + + switch (fmt[0]) { + case 'c': RETURN("c"); + case 'b': RETURN("b"); + case 'B': RETURN("B"); + case 'h': RETURN("h"); + case 'H': RETURN("H"); + case 'i': RETURN("i"); + case 'I': RETURN("I"); + case 'l': RETURN("l"); + case 'L': RETURN("L"); + #ifdef HAVE_LONG_LONG + case 'q': RETURN("q"); + case 'Q': RETURN("Q"); + #endif + case 'n': RETURN("n"); + case 'N': RETURN("N"); + case 'f': RETURN("f"); + case 'd': RETURN("d"); + #ifdef HAVE_C99_BOOL + case '?': RETURN("?"); + #else + case '?': RETURN("?"); + #endif + case 'P': RETURN("P"); + } + + return NULL; +} + + /* Cast a memoryview's data type to 'format'. The input array must be C-contiguous. At least one of input-format, output-format must have byte size. The output array is 1-D, with the same byte length as the @@ -1184,10 +1229,13 @@ cast_to_1D(PyMemoryViewObject *mv, PyObject *format) goto out; } - strncpy(mv->format, PyBytes_AS_STRING(asciifmt), - _Py_MEMORYVIEW_MAX_FORMAT); - mv->format[_Py_MEMORYVIEW_MAX_FORMAT-1] = '\0'; - view->format = mv->format; + view->format = get_native_fmtstr(PyBytes_AS_STRING(asciifmt)); + if (view->format == NULL) { + /* NOT_REACHED: get_native_fmtchar() already validates the format. */ + PyErr_SetString(PyExc_RuntimeError, + "memoryview: internal error"); + goto out; + } view->itemsize = itemsize; view->ndim = 1; -- cgit v0.12 From fc341bd4c5da88b31aba2806f4288d19e945ad1d Mon Sep 17 00:00:00 2001 From: Stefan Krah Date: Thu, 29 Jan 2015 14:33:37 +0100 Subject: Whitespace. --- Lib/test/test_memoryview.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Lib/test/test_memoryview.py b/Lib/test/test_memoryview.py index bd9d0d4..69b9d2d 100644 --- a/Lib/test/test_memoryview.py +++ b/Lib/test/test_memoryview.py @@ -367,12 +367,12 @@ class AbstractMemoryTests: d = memoryview(b) del b - + self.assertEqual(c[0], 256) self.assertEqual(d[0], 256) self.assertEqual(c.format, "H") self.assertEqual(d.format, "H") - + _ = m.cast('I') self.assertEqual(c[0], 256) self.assertEqual(d[0], 256) -- cgit v0.12 From 3c0cf05901ea5cca0694734fd4a64b2bc267cb41 Mon Sep 17 00:00:00 2001 From: Stefan Krah Date: Thu, 29 Jan 2015 17:33:31 +0100 Subject: Issue #22668: Remove endianness assumption in test. --- Lib/test/test_memoryview.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/Lib/test/test_memoryview.py b/Lib/test/test_memoryview.py index bd9d0d4..4bc3133 100644 --- a/Lib/test/test_memoryview.py +++ b/Lib/test/test_memoryview.py @@ -361,18 +361,20 @@ class AbstractMemoryTests: self.assertEqual(list(reversed(m)), list(m[::-1])) def test_issue22668(self): - m = memoryview(bytes(range(8))) + a = array.array('H', [256, 256, 256, 256]) + x = memoryview(a) + m = x.cast('B') b = m.cast('H') c = b[0:2] d = memoryview(b) del b - + self.assertEqual(c[0], 256) self.assertEqual(d[0], 256) self.assertEqual(c.format, "H") self.assertEqual(d.format, "H") - + _ = m.cast('I') self.assertEqual(c[0], 256) self.assertEqual(d[0], 256) -- cgit v0.12 From 978a9afc6af6c137065bdcf7ae4ef5450e5b2ec2 Mon Sep 17 00:00:00 2001 From: Victor Stinner Date: Thu, 29 Jan 2015 17:50:58 +0100 Subject: Issue #23243, asyncio: Emit a ResourceWarning when an event loop or a transport is not explicitly closed. Close also explicitly transports in test_sslproto. --- Lib/asyncio/base_events.py | 11 +++++++++++ Lib/asyncio/base_subprocess.py | 19 ++++++++++++++++++- Lib/asyncio/futures.py | 6 +++--- Lib/asyncio/proactor_events.py | 11 +++++++++++ Lib/asyncio/selector_events.py | 16 ++++++++++++++++ Lib/asyncio/sslproto.py | 13 +++++++++++++ Lib/asyncio/unix_events.py | 19 +++++++++++++++++++ Lib/asyncio/windows_utils.py | 6 +++++- Lib/test/test_asyncio/test_proactor_events.py | 6 +++++- Lib/test/test_asyncio/test_sslproto.py | 7 +++---- 10 files changed, 104 insertions(+), 10 deletions(-) diff --git a/Lib/asyncio/base_events.py b/Lib/asyncio/base_events.py index e40d3ad..7108f25 100644 --- a/Lib/asyncio/base_events.py +++ b/Lib/asyncio/base_events.py @@ -26,6 +26,7 @@ import threading import time import traceback import sys +import warnings from . import coroutines from . import events @@ -333,6 +334,16 @@ class BaseEventLoop(events.AbstractEventLoop): """Returns True if the event loop was closed.""" return self._closed + # On Python 3.3 and older, objects with a destructor part of a reference + # cycle are never destroyed. It's not more the case on Python 3.4 thanks + # to the PEP 442. + if sys.version_info >= (3, 4): + def __del__(self): + if not self.is_closed(): + warnings.warn("unclosed event loop %r" % self, ResourceWarning) + if not self.is_running(): + self.close() + def is_running(self): """Returns True if the event loop is running.""" return (self._owner is not None) diff --git a/Lib/asyncio/base_subprocess.py b/Lib/asyncio/base_subprocess.py index 81c6f1a..651a9a2 100644 --- a/Lib/asyncio/base_subprocess.py +++ b/Lib/asyncio/base_subprocess.py @@ -1,5 +1,7 @@ import collections import subprocess +import sys +import warnings from . import protocols from . import transports @@ -13,6 +15,7 @@ class BaseSubprocessTransport(transports.SubprocessTransport): stdin, stdout, stderr, bufsize, extra=None, **kwargs): super().__init__(extra) + self._closed = False self._protocol = protocol self._loop = loop self._pid = None @@ -40,7 +43,10 @@ class BaseSubprocessTransport(transports.SubprocessTransport): program, self._pid) def __repr__(self): - info = [self.__class__.__name__, 'pid=%s' % self._pid] + info = [self.__class__.__name__] + if self._closed: + info.append('closed') + info.append('pid=%s' % self._pid) if self._returncode is not None: info.append('returncode=%s' % self._returncode) @@ -70,6 +76,7 @@ class BaseSubprocessTransport(transports.SubprocessTransport): raise NotImplementedError def close(self): + self._closed = True for proto in self._pipes.values(): if proto is None: continue @@ -77,6 +84,15 @@ class BaseSubprocessTransport(transports.SubprocessTransport): if self._returncode is None: self.terminate() + # On Python 3.3 and older, objects with a destructor part of a reference + # cycle are never destroyed. It's not more the case on Python 3.4 thanks + # to the PEP 442. + if sys.version_info >= (3, 4): + def __del__(self): + if not self._closed: + warnings.warn("unclosed transport %r" % self, ResourceWarning) + self.close() + def get_pid(self): return self._pid @@ -104,6 +120,7 @@ class BaseSubprocessTransport(transports.SubprocessTransport): Function called when an exception is raised during the creation of a subprocess. """ + self._closed = True if self._loop.get_debug(): logger.warning('Exception during subprocess creation, ' 'kill the subprocess %r', diff --git a/Lib/asyncio/futures.py b/Lib/asyncio/futures.py index 19212a9..2c741fd 100644 --- a/Lib/asyncio/futures.py +++ b/Lib/asyncio/futures.py @@ -195,9 +195,9 @@ class Future: info = self._repr_info() return '<%s %s>' % (self.__class__.__name__, ' '.join(info)) - # On Python 3.3 or older, objects with a destructor part of a reference - # cycle are never destroyed. It's not more the case on Python 3.4 thanks to - # the PEP 442. + # On Python 3.3 and older, objects with a destructor part of a reference + # cycle are never destroyed. It's not more the case on Python 3.4 thanks + # to the PEP 442. if _PY34: def __del__(self): if not self._log_traceback: diff --git a/Lib/asyncio/proactor_events.py b/Lib/asyncio/proactor_events.py index 0f533a5..65de926 100644 --- a/Lib/asyncio/proactor_events.py +++ b/Lib/asyncio/proactor_events.py @@ -7,6 +7,8 @@ proactor is only implemented on Windows with IOCP. __all__ = ['BaseProactorEventLoop'] import socket +import sys +import warnings from . import base_events from . import constants @@ -74,6 +76,15 @@ class _ProactorBasePipeTransport(transports._FlowControlMixin, self._read_fut.cancel() self._read_fut = None + # On Python 3.3 and older, objects with a destructor part of a reference + # cycle are never destroyed. It's not more the case on Python 3.4 thanks + # to the PEP 442. + if sys.version_info >= (3, 4): + def __del__(self): + if self._sock is not None: + warnings.warn("unclosed transport %r" % self, ResourceWarning) + self.close() + def _fatal_error(self, exc, message='Fatal error on pipe transport'): if isinstance(exc, (BrokenPipeError, ConnectionResetError)): if self._loop.get_debug(): diff --git a/Lib/asyncio/selector_events.py b/Lib/asyncio/selector_events.py index 9147832..4bd6dc8 100644 --- a/Lib/asyncio/selector_events.py +++ b/Lib/asyncio/selector_events.py @@ -10,6 +10,8 @@ import collections import errno import functools import socket +import sys +import warnings try: import ssl except ImportError: # pragma: no cover @@ -499,6 +501,11 @@ class _SelectorTransport(transports._FlowControlMixin, _buffer_factory = bytearray # Constructs initial value for self._buffer. + # Attribute used in the destructor: it must be set even if the constructor + # is not called (see _SelectorSslTransport which may start by raising an + # exception) + _sock = None + def __init__(self, loop, sock, protocol, extra=None, server=None): super().__init__(extra, loop) self._extra['socket'] = sock @@ -559,6 +566,15 @@ class _SelectorTransport(transports._FlowControlMixin, self._conn_lost += 1 self._loop.call_soon(self._call_connection_lost, None) + # On Python 3.3 and older, objects with a destructor part of a reference + # cycle are never destroyed. It's not more the case on Python 3.4 thanks + # to the PEP 442. + if sys.version_info >= (3, 4): + def __del__(self): + if self._sock is not None: + warnings.warn("unclosed transport %r" % self, ResourceWarning) + self._sock.close() + def _fatal_error(self, exc, message='Fatal error on transport'): # Should be called from exception handler only. if isinstance(exc, (BrokenPipeError, diff --git a/Lib/asyncio/sslproto.py b/Lib/asyncio/sslproto.py index fc809b9..235855e 100644 --- a/Lib/asyncio/sslproto.py +++ b/Lib/asyncio/sslproto.py @@ -1,4 +1,6 @@ import collections +import sys +import warnings try: import ssl except ImportError: # pragma: no cover @@ -295,6 +297,7 @@ class _SSLProtocolTransport(transports._FlowControlMixin, self._loop = loop self._ssl_protocol = ssl_protocol self._app_protocol = app_protocol + self._closed = False def get_extra_info(self, name, default=None): """Get optional transport information.""" @@ -308,8 +311,18 @@ class _SSLProtocolTransport(transports._FlowControlMixin, protocol's connection_lost() method will (eventually) called with None as its argument. """ + self._closed = True self._ssl_protocol._start_shutdown() + # On Python 3.3 and older, objects with a destructor part of a reference + # cycle are never destroyed. It's not more the case on Python 3.4 thanks + # to the PEP 442. + if sys.version_info >= (3, 4): + def __del__(self): + if not self._closed: + warnings.warn("unclosed transport %r" % self, ResourceWarning) + self.close() + def pause_reading(self): """Pause the receiving end. diff --git a/Lib/asyncio/unix_events.py b/Lib/asyncio/unix_events.py index 7e1265a..b06f1b2 100644 --- a/Lib/asyncio/unix_events.py +++ b/Lib/asyncio/unix_events.py @@ -8,6 +8,7 @@ import stat import subprocess import sys import threading +import warnings from . import base_events @@ -353,6 +354,15 @@ class _UnixReadPipeTransport(transports.ReadTransport): if not self._closing: self._close(None) + # On Python 3.3 and older, objects with a destructor part of a reference + # cycle are never destroyed. It's not more the case on Python 3.4 thanks + # to the PEP 442. + if sys.version_info >= (3, 4): + def __del__(self): + if self._pipe is not None: + warnings.warn("unclosed transport %r" % self, ResourceWarning) + self._pipe.close() + def _fatal_error(self, exc, message='Fatal error on pipe transport'): # should be called by exception handler only if (isinstance(exc, OSError) and exc.errno == errno.EIO): @@ -529,6 +539,15 @@ class _UnixWritePipeTransport(transports._FlowControlMixin, # write_eof is all what we needed to close the write pipe self.write_eof() + # On Python 3.3 and older, objects with a destructor part of a reference + # cycle are never destroyed. It's not more the case on Python 3.4 thanks + # to the PEP 442. + if sys.version_info >= (3, 4): + def __del__(self): + if self._pipe is not None: + warnings.warn("unclosed transport %r" % self, ResourceWarning) + self._pipe.close() + def abort(self): self._close(None) diff --git a/Lib/asyncio/windows_utils.py b/Lib/asyncio/windows_utils.py index 5f8327e..870cd13 100644 --- a/Lib/asyncio/windows_utils.py +++ b/Lib/asyncio/windows_utils.py @@ -14,6 +14,7 @@ import os import socket import subprocess import tempfile +import warnings __all__ = ['socketpair', 'pipe', 'Popen', 'PIPE', 'PipeHandle'] @@ -156,7 +157,10 @@ class PipeHandle: CloseHandle(self._handle) self._handle = None - __del__ = close + def __del__(self): + if self._handle is not None: + warnings.warn("unclosed %r" % self, ResourceWarning) + self.close() def __enter__(self): return self diff --git a/Lib/test/test_asyncio/test_proactor_events.py b/Lib/test/test_asyncio/test_proactor_events.py index 33a8a67..fcd9ab1 100644 --- a/Lib/test/test_asyncio/test_proactor_events.py +++ b/Lib/test/test_asyncio/test_proactor_events.py @@ -499,8 +499,12 @@ class BaseProactorEventLoopTests(test_utils.TestCase): self.proactor.accept.assert_called_with(self.sock) def test_socketpair(self): + class EventLoop(BaseProactorEventLoop): + # override the destructor to not log a ResourceWarning + def __del__(self): + pass self.assertRaises( - NotImplementedError, BaseProactorEventLoop, self.proactor) + NotImplementedError, EventLoop, self.proactor) def test_make_socket_transport(self): tr = self.loop._make_socket_transport(self.sock, asyncio.Protocol()) diff --git a/Lib/test/test_asyncio/test_sslproto.py b/Lib/test/test_asyncio/test_sslproto.py index 148e30d..a72967e 100644 --- a/Lib/test/test_asyncio/test_sslproto.py +++ b/Lib/test/test_asyncio/test_sslproto.py @@ -22,7 +22,9 @@ class SslProtoHandshakeTests(test_utils.TestCase): def ssl_protocol(self, waiter=None): sslcontext = test_utils.dummy_ssl_context() app_proto = asyncio.Protocol() - return sslproto.SSLProtocol(self.loop, app_proto, sslcontext, waiter) + proto = sslproto.SSLProtocol(self.loop, app_proto, sslcontext, waiter) + self.addCleanup(proto._app_transport.close) + return proto def connection_made(self, ssl_proto, do_handshake=None): transport = mock.Mock() @@ -56,9 +58,6 @@ class SslProtoHandshakeTests(test_utils.TestCase): with test_utils.disable_logger(): self.loop.run_until_complete(handshake_fut) - # Close the transport - ssl_proto._app_transport.close() - def test_eof_received_waiter(self): waiter = asyncio.Future(loop=self.loop) ssl_proto = self.ssl_protocol(waiter) -- cgit v0.12 From 47cd10d7a903773f574fc93220dbca850067fa0c Mon Sep 17 00:00:00 2001 From: Victor Stinner Date: Fri, 30 Jan 2015 00:05:19 +0100 Subject: asyncio: sync with Tulip Issue #23347: send_signal(), kill() and terminate() methods of BaseSubprocessTransport now check if the transport was closed and if the process exited. Issue #23347: Refactor creation of subprocess transports. Changes on BaseSubprocessTransport: * Add a wait() method to wait until the child process exit * The constructor now accepts an optional waiter parameter. The _post_init() coroutine must not be called explicitly anymore. It makes subprocess transports closer to other transports, and it gives more freedom if we want later to change completly how subprocess transports are created. * close() now kills the process instead of kindly terminate it: the child process may ignore SIGTERM and continue to run. Call explicitly terminate() and wait() if you want to kindly terminate the child process. * close() now logs a warning in debug mode if the process is still running and needs to be killed * _make_subprocess_transport() is now fully asynchronous again: if the creation of the transport failed, wait asynchronously for the process eixt. Before the wait was synchronous. This change requires close() to *kill*, and not terminate, the child process. * Remove the _kill_wait() method, replaced with a more agressive close() method. It fixes _make_subprocess_transport() on error. BaseSubprocessTransport.close() calls the close() method of pipe transports, whereas _kill_wait() closed directly pipes of the subprocess.Popen object without unregistering file descriptors from the selector (which caused severe bugs). These changes simplifies the code of subprocess.py. --- Lib/asyncio/base_subprocess.py | 108 +++++++++++++++++++------------ Lib/asyncio/subprocess.py | 40 +----------- Lib/asyncio/unix_events.py | 15 +++-- Lib/asyncio/windows_events.py | 7 +- Lib/test/test_asyncio/test_events.py | 35 +++++----- Lib/test/test_asyncio/test_subprocess.py | 65 +++++++++++++++++++ 6 files changed, 166 insertions(+), 104 deletions(-) diff --git a/Lib/asyncio/base_subprocess.py b/Lib/asyncio/base_subprocess.py index 651a9a2..001f9b8 100644 --- a/Lib/asyncio/base_subprocess.py +++ b/Lib/asyncio/base_subprocess.py @@ -3,6 +3,7 @@ import subprocess import sys import warnings +from . import futures from . import protocols from . import transports from .coroutines import coroutine @@ -13,27 +14,32 @@ class BaseSubprocessTransport(transports.SubprocessTransport): def __init__(self, loop, protocol, args, shell, stdin, stdout, stderr, bufsize, - extra=None, **kwargs): + waiter=None, extra=None, **kwargs): super().__init__(extra) self._closed = False self._protocol = protocol self._loop = loop + self._proc = None self._pid = None - + self._returncode = None + self._exit_waiters = [] + self._pending_calls = collections.deque() self._pipes = {} + self._finished = False + if stdin == subprocess.PIPE: self._pipes[0] = None if stdout == subprocess.PIPE: self._pipes[1] = None if stderr == subprocess.PIPE: self._pipes[2] = None - self._pending_calls = collections.deque() - self._finished = False - self._returncode = None + + # Create the child process: set the _proc attribute self._start(args=args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr, bufsize=bufsize, **kwargs) self._pid = self._proc.pid self._extra['subprocess'] = self._proc + if self._loop.get_debug(): if isinstance(args, (bytes, str)): program = args @@ -42,6 +48,8 @@ class BaseSubprocessTransport(transports.SubprocessTransport): logger.debug('process %r created: pid %s', program, self._pid) + self._loop.create_task(self._connect_pipes(waiter)) + def __repr__(self): info = [self.__class__.__name__] if self._closed: @@ -77,12 +85,23 @@ class BaseSubprocessTransport(transports.SubprocessTransport): def close(self): self._closed = True + for proto in self._pipes.values(): if proto is None: continue proto.pipe.close() - if self._returncode is None: - self.terminate() + + if self._proc is not None and self._returncode is None: + if self._loop.get_debug(): + logger.warning('Close running child process: kill %r', self) + + try: + self._proc.kill() + except ProcessLookupError: + pass + + # Don't clear the _proc reference yet because _post_init() may + # still run # On Python 3.3 and older, objects with a destructor part of a reference # cycle are never destroyed. It's not more the case on Python 3.4 thanks @@ -105,59 +124,42 @@ class BaseSubprocessTransport(transports.SubprocessTransport): else: return None + def _check_proc(self): + if self._closed: + raise ValueError("operation on closed transport") + if self._proc is None: + raise ProcessLookupError() + def send_signal(self, signal): + self._check_proc() self._proc.send_signal(signal) def terminate(self): + self._check_proc() self._proc.terminate() def kill(self): + self._check_proc() self._proc.kill() - def _kill_wait(self): - """Close pipes, kill the subprocess and read its return status. - - Function called when an exception is raised during the creation - of a subprocess. - """ - self._closed = True - if self._loop.get_debug(): - logger.warning('Exception during subprocess creation, ' - 'kill the subprocess %r', - self, - exc_info=True) - - proc = self._proc - if proc.stdout: - proc.stdout.close() - if proc.stderr: - proc.stderr.close() - if proc.stdin: - proc.stdin.close() - - try: - proc.kill() - except ProcessLookupError: - pass - self._returncode = proc.wait() - - self.close() - @coroutine - def _post_init(self): + def _connect_pipes(self, waiter): try: proc = self._proc loop = self._loop + if proc.stdin is not None: _, pipe = yield from loop.connect_write_pipe( lambda: WriteSubprocessPipeProto(self, 0), proc.stdin) self._pipes[0] = pipe + if proc.stdout is not None: _, pipe = yield from loop.connect_read_pipe( lambda: ReadSubprocessPipeProto(self, 1), proc.stdout) self._pipes[1] = pipe + if proc.stderr is not None: _, pipe = yield from loop.connect_read_pipe( lambda: ReadSubprocessPipeProto(self, 2), @@ -166,13 +168,16 @@ class BaseSubprocessTransport(transports.SubprocessTransport): assert self._pending_calls is not None - self._loop.call_soon(self._protocol.connection_made, self) + loop.call_soon(self._protocol.connection_made, self) for callback, data in self._pending_calls: - self._loop.call_soon(callback, *data) + loop.call_soon(callback, *data) self._pending_calls = None - except: - self._kill_wait() - raise + except Exception as exc: + if waiter is not None and not waiter.cancelled(): + waiter.set_exception(exc) + else: + if waiter is not None and not waiter.cancelled(): + waiter.set_result(None) def _call(self, cb, *data): if self._pending_calls is not None: @@ -197,6 +202,23 @@ class BaseSubprocessTransport(transports.SubprocessTransport): self._call(self._protocol.process_exited) self._try_finish() + # wake up futures waiting for wait() + for waiter in self._exit_waiters: + if not waiter.cancelled(): + waiter.set_result(returncode) + self._exit_waiters = None + + def wait(self): + """Wait until the process exit and return the process return code. + + This method is a coroutine.""" + if self._returncode is not None: + return self._returncode + + waiter = futures.Future(loop=self._loop) + self._exit_waiters.append(waiter) + return (yield from waiter) + def _try_finish(self): assert not self._finished if self._returncode is None: @@ -210,9 +232,9 @@ class BaseSubprocessTransport(transports.SubprocessTransport): try: self._protocol.connection_lost(exc) finally: + self._loop = None self._proc = None self._protocol = None - self._loop = None class WriteSubprocessPipeProto(protocols.BaseProtocol): diff --git a/Lib/asyncio/subprocess.py b/Lib/asyncio/subprocess.py index c848a21..d0c9779 100644 --- a/Lib/asyncio/subprocess.py +++ b/Lib/asyncio/subprocess.py @@ -25,8 +25,6 @@ class SubprocessStreamProtocol(streams.FlowControlMixin, super().__init__(loop=loop) self._limit = limit self.stdin = self.stdout = self.stderr = None - self.waiter = futures.Future(loop=loop) - self._waiters = collections.deque() self._transport = None def __repr__(self): @@ -61,9 +59,6 @@ class SubprocessStreamProtocol(streams.FlowControlMixin, reader=None, loop=self._loop) - if not self.waiter.cancelled(): - self.waiter.set_result(None) - def pipe_data_received(self, fd, data): if fd == 1: reader = self.stdout @@ -94,16 +89,9 @@ class SubprocessStreamProtocol(streams.FlowControlMixin, reader.set_exception(exc) def process_exited(self): - returncode = self._transport.get_returncode() self._transport.close() self._transport = None - # wake up futures waiting for wait() - while self._waiters: - waiter = self._waiters.popleft() - if not waiter.cancelled(): - waiter.set_result(returncode) - class Process: def __init__(self, transport, protocol, loop): @@ -124,30 +112,18 @@ class Process: @coroutine def wait(self): - """Wait until the process exit and return the process return code.""" - returncode = self._transport.get_returncode() - if returncode is not None: - return returncode - - waiter = futures.Future(loop=self._loop) - self._protocol._waiters.append(waiter) - yield from waiter - return waiter.result() + """Wait until the process exit and return the process return code. - def _check_alive(self): - if self._transport.get_returncode() is not None: - raise ProcessLookupError() + This method is a coroutine.""" + return (yield from self._transport.wait()) def send_signal(self, signal): - self._check_alive() self._transport.send_signal(signal) def terminate(self): - self._check_alive() self._transport.terminate() def kill(self): - self._check_alive() self._transport.kill() @coroutine @@ -221,11 +197,6 @@ def create_subprocess_shell(cmd, stdin=None, stdout=None, stderr=None, protocol_factory, cmd, stdin=stdin, stdout=stdout, stderr=stderr, **kwds) - try: - yield from protocol.waiter - except: - transport._kill_wait() - raise return Process(transport, protocol, loop) @coroutine @@ -241,9 +212,4 @@ def create_subprocess_exec(program, *args, stdin=None, stdout=None, program, *args, stdin=stdin, stdout=stdout, stderr=stderr, **kwds) - try: - yield from protocol.waiter - except: - transport._kill_wait() - raise return Process(transport, protocol, loop) diff --git a/Lib/asyncio/unix_events.py b/Lib/asyncio/unix_events.py index b06f1b2..3ecdfd2 100644 --- a/Lib/asyncio/unix_events.py +++ b/Lib/asyncio/unix_events.py @@ -16,6 +16,7 @@ from . import base_subprocess from . import constants from . import coroutines from . import events +from . import futures from . import selector_events from . import selectors from . import transports @@ -175,16 +176,20 @@ class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop): stdin, stdout, stderr, bufsize, extra=None, **kwargs): with events.get_child_watcher() as watcher: + waiter = futures.Future(loop=self) transp = _UnixSubprocessTransport(self, protocol, args, shell, stdin, stdout, stderr, bufsize, - extra=extra, **kwargs) + waiter=waiter, extra=extra, + **kwargs) + + watcher.add_child_handler(transp.get_pid(), + self._child_watcher_callback, transp) try: - yield from transp._post_init() + yield from waiter except: transp.close() + yield from transp.wait() raise - watcher.add_child_handler(transp.get_pid(), - self._child_watcher_callback, transp) return transp @@ -774,7 +779,7 @@ class SafeChildWatcher(BaseChildWatcher): pass def add_child_handler(self, pid, callback, *args): - self._callbacks[pid] = callback, args + self._callbacks[pid] = (callback, args) # Prevent a race condition in case the child is already terminated. self._do_waitpid(pid) diff --git a/Lib/asyncio/windows_events.py b/Lib/asyncio/windows_events.py index 94aafb6..437eb0a 100644 --- a/Lib/asyncio/windows_events.py +++ b/Lib/asyncio/windows_events.py @@ -366,13 +366,16 @@ class ProactorEventLoop(proactor_events.BaseProactorEventLoop): def _make_subprocess_transport(self, protocol, args, shell, stdin, stdout, stderr, bufsize, extra=None, **kwargs): + waiter = futures.Future(loop=self) transp = _WindowsSubprocessTransport(self, protocol, args, shell, stdin, stdout, stderr, bufsize, - extra=extra, **kwargs) + waiter=waiter, extra=extra, + **kwargs) try: - yield from transp._post_init() + yield from waiter except: transp.close() + yield from transp.wait() raise return transp diff --git a/Lib/test/test_asyncio/test_events.py b/Lib/test/test_asyncio/test_events.py index 12af62b..4b957d8 100644 --- a/Lib/test/test_asyncio/test_events.py +++ b/Lib/test/test_asyncio/test_events.py @@ -1551,9 +1551,10 @@ class SubprocessTestsMixin: stdin = transp.get_pipe_transport(0) stdin.write(b'Python The Winner') self.loop.run_until_complete(proto.got_data[1].wait()) - transp.close() + with test_utils.disable_logger(): + transp.close() self.loop.run_until_complete(proto.completed) - self.check_terminated(proto.returncode) + self.check_killed(proto.returncode) self.assertEqual(b'Python The Winner', proto.data[1]) def test_subprocess_interactive(self): @@ -1567,21 +1568,20 @@ class SubprocessTestsMixin: self.loop.run_until_complete(proto.connected) self.assertEqual('CONNECTED', proto.state) - try: - stdin = transp.get_pipe_transport(0) - stdin.write(b'Python ') - self.loop.run_until_complete(proto.got_data[1].wait()) - proto.got_data[1].clear() - self.assertEqual(b'Python ', proto.data[1]) - - stdin.write(b'The Winner') - self.loop.run_until_complete(proto.got_data[1].wait()) - self.assertEqual(b'Python The Winner', proto.data[1]) - finally: - transp.close() + stdin = transp.get_pipe_transport(0) + stdin.write(b'Python ') + self.loop.run_until_complete(proto.got_data[1].wait()) + proto.got_data[1].clear() + self.assertEqual(b'Python ', proto.data[1]) + stdin.write(b'The Winner') + self.loop.run_until_complete(proto.got_data[1].wait()) + self.assertEqual(b'Python The Winner', proto.data[1]) + + with test_utils.disable_logger(): + transp.close() self.loop.run_until_complete(proto.completed) - self.check_terminated(proto.returncode) + self.check_killed(proto.returncode) def test_subprocess_shell(self): connect = self.loop.subprocess_shell( @@ -1739,9 +1739,10 @@ class SubprocessTestsMixin: # GetLastError()==ERROR_INVALID_NAME on Windows!?! (Using # WriteFile() we get ERROR_BROKEN_PIPE as expected.) self.assertEqual(b'ERR:OSError', proto.data[2]) - transp.close() + with test_utils.disable_logger(): + transp.close() self.loop.run_until_complete(proto.completed) - self.check_terminated(proto.returncode) + self.check_killed(proto.returncode) def test_subprocess_wait_no_same_group(self): # start the new process in a new session diff --git a/Lib/test/test_asyncio/test_subprocess.py b/Lib/test/test_asyncio/test_subprocess.py index ecc2c9d..4f197f3 100644 --- a/Lib/test/test_asyncio/test_subprocess.py +++ b/Lib/test/test_asyncio/test_subprocess.py @@ -4,6 +4,7 @@ import unittest from unittest import mock import asyncio +from asyncio import base_subprocess from asyncio import subprocess from asyncio import test_utils try: @@ -23,6 +24,70 @@ PROGRAM_CAT = [ 'data = sys.stdin.buffer.read()', 'sys.stdout.buffer.write(data)'))] +class TestSubprocessTransport(base_subprocess.BaseSubprocessTransport): + def _start(self, *args, **kwargs): + self._proc = mock.Mock() + self._proc.stdin = None + self._proc.stdout = None + self._proc.stderr = None + + +class SubprocessTransportTests(test_utils.TestCase): + def setUp(self): + self.loop = self.new_test_loop() + self.set_event_loop(self.loop) + + + def create_transport(self, waiter=None): + protocol = mock.Mock() + protocol.connection_made._is_coroutine = False + protocol.process_exited._is_coroutine = False + transport = TestSubprocessTransport( + self.loop, protocol, ['test'], False, + None, None, None, 0, waiter=waiter) + return (transport, protocol) + + def test_close(self): + waiter = asyncio.Future(loop=self.loop) + transport, protocol = self.create_transport(waiter) + transport._process_exited(0) + transport.close() + + # The loop didn't run yet + self.assertFalse(protocol.connection_made.called) + + # methods must raise ProcessLookupError if the transport was closed + self.assertRaises(ValueError, transport.send_signal, signal.SIGTERM) + self.assertRaises(ValueError, transport.terminate) + self.assertRaises(ValueError, transport.kill) + + self.loop.run_until_complete(waiter) + + def test_proc_exited(self): + waiter = asyncio.Future(loop=self.loop) + transport, protocol = self.create_transport(waiter) + transport._process_exited(6) + self.loop.run_until_complete(waiter) + + self.assertEqual(transport.get_returncode(), 6) + + self.assertTrue(protocol.connection_made.called) + self.assertTrue(protocol.process_exited.called) + self.assertTrue(protocol.connection_lost.called) + self.assertEqual(protocol.connection_lost.call_args[0], (None,)) + + self.assertFalse(transport._closed) + self.assertIsNone(transport._loop) + self.assertIsNone(transport._proc) + self.assertIsNone(transport._protocol) + + # methods must raise ProcessLookupError if the process exited + self.assertRaises(ProcessLookupError, + transport.send_signal, signal.SIGTERM) + self.assertRaises(ProcessLookupError, transport.terminate) + self.assertRaises(ProcessLookupError, transport.kill) + + class SubprocessMixin: def test_stdin_stdout(self): -- cgit v0.12 From 0698638d797ca864f069d828dd2ea6d55b87a04e Mon Sep 17 00:00:00 2001 From: Victor Stinner Date: Fri, 30 Jan 2015 00:11:42 +0100 Subject: asyncio: Fix ResourceWarning in test_subprocess.test_proc_exit() --- Lib/test/test_asyncio/test_subprocess.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/Lib/test/test_asyncio/test_subprocess.py b/Lib/test/test_asyncio/test_subprocess.py index 4f197f3..d4b71b7 100644 --- a/Lib/test/test_asyncio/test_subprocess.py +++ b/Lib/test/test_asyncio/test_subprocess.py @@ -87,6 +87,8 @@ class SubprocessTransportTests(test_utils.TestCase): self.assertRaises(ProcessLookupError, transport.terminate) self.assertRaises(ProcessLookupError, transport.kill) + transport.close() + class SubprocessMixin: -- cgit v0.12 From 1241ecc21bbb3f86734a3071b93514a12dd1ee23 Mon Sep 17 00:00:00 2001 From: Victor Stinner Date: Fri, 30 Jan 2015 00:16:14 +0100 Subject: Issue #23347, asyncio: Make BaseSubprocessTransport.wait() private --- Lib/asyncio/base_subprocess.py | 2 +- Lib/asyncio/subprocess.py | 2 +- Lib/asyncio/unix_events.py | 2 +- Lib/asyncio/windows_events.py | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/Lib/asyncio/base_subprocess.py b/Lib/asyncio/base_subprocess.py index 001f9b8..70676ab 100644 --- a/Lib/asyncio/base_subprocess.py +++ b/Lib/asyncio/base_subprocess.py @@ -208,7 +208,7 @@ class BaseSubprocessTransport(transports.SubprocessTransport): waiter.set_result(returncode) self._exit_waiters = None - def wait(self): + def _wait(self): """Wait until the process exit and return the process return code. This method is a coroutine.""" diff --git a/Lib/asyncio/subprocess.py b/Lib/asyncio/subprocess.py index d0c9779..4600a9f 100644 --- a/Lib/asyncio/subprocess.py +++ b/Lib/asyncio/subprocess.py @@ -115,7 +115,7 @@ class Process: """Wait until the process exit and return the process return code. This method is a coroutine.""" - return (yield from self._transport.wait()) + return (yield from self._transport._wait()) def send_signal(self, signal): self._transport.send_signal(signal) diff --git a/Lib/asyncio/unix_events.py b/Lib/asyncio/unix_events.py index 3ecdfd2..1fc39ab 100644 --- a/Lib/asyncio/unix_events.py +++ b/Lib/asyncio/unix_events.py @@ -188,7 +188,7 @@ class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop): yield from waiter except: transp.close() - yield from transp.wait() + yield from transp._wait() raise return transp diff --git a/Lib/asyncio/windows_events.py b/Lib/asyncio/windows_events.py index 437eb0a..c4bffc4 100644 --- a/Lib/asyncio/windows_events.py +++ b/Lib/asyncio/windows_events.py @@ -375,7 +375,7 @@ class ProactorEventLoop(proactor_events.BaseProactorEventLoop): yield from waiter except: transp.close() - yield from transp.wait() + yield from transp._wait() raise return transp -- cgit v0.12 From 7a55b88d9cf55539d28e2aac6ced20c780984158 Mon Sep 17 00:00:00 2001 From: Victor Stinner Date: Fri, 30 Jan 2015 00:37:04 +0100 Subject: Issue #21962, asyncio doc: Suggest the usage of wait_for() to replace the lack of timeout parameter for locks and queues. --- Doc/library/asyncio-sync.rst | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/Doc/library/asyncio-sync.rst b/Doc/library/asyncio-sync.rst index c63447b..4cc9a96 100644 --- a/Doc/library/asyncio-sync.rst +++ b/Doc/library/asyncio-sync.rst @@ -4,6 +4,29 @@ Synchronization primitives ========================== +Locks: + +* :class:`Lock` +* :class:`Event` +* :class:`Condition` +* :class:`Semaphore` +* :class:`BoundedSemaphore` + +Queues: + +* :class:`Queue` +* :class:`PriorityQueue` +* :class:`LifoQueue` +* :class:`JoinableQueue` + +asyncio locks and queues API were designed to be close to classes of the +:mod:`threading` module (:class:`~threading.Lock`, :class:`~threading.Event`, +:class:`~threading.Condition`, :class:`~threading.Semaphore`, +:class:`~threading.BoundedSemaphore`) and the :mod:`queue` module +(:class:`~queue.Queue`, :class:`~queue.PriorityQueue`, +:class:`~queue.LifoQueue`), but they have no *timeout* parameter. The +:func:`asyncio.wait_for` function can be used to cancel a task after a timeout. + Locks ----- -- cgit v0.12 From 1077dee4575ccc43c10515de50d7c100d6ce9455 Mon Sep 17 00:00:00 2001 From: Victor Stinner Date: Fri, 30 Jan 2015 00:55:58 +0100 Subject: asyncio doc: add a section about task cancellation --- Doc/library/asyncio-dev.rst | 37 +++++++++++++++++++++++++++++++++++++ 1 file changed, 37 insertions(+) diff --git a/Doc/library/asyncio-dev.rst b/Doc/library/asyncio-dev.rst index 7083e60..72a06f5 100644 --- a/Doc/library/asyncio-dev.rst +++ b/Doc/library/asyncio-dev.rst @@ -40,6 +40,43 @@ Examples of effects of the debug mode: `. +Cancellation +------------ + +Cancellation of tasks is not common in classic programming. In asynchronous +programming, not only it is something common, but you have to prepare your +code to handle it. + +Futures and tasks can be cancelled explicitly with their :meth:`Future.cancel` +method. The :func:`wait_for` function cancels the waited task when the timeout +occurs. There are many other cases where a task can be cancelled indirectly. + +Don't call :meth:`~Future.set_result` or :meth:`~Future.set_exception` method +of :class:`Future` if the future is cancelled: it would fail with an exception. +For example, write:: + + if not fut.cancelled(): + fut.set_result('done') + +Don't schedule directly a call to the :meth:`~Future.set_result` or the +:meth:`~Future.set_exception` method of a future with +:meth:`BaseEventLoop.call_soon`: the future can be cancelled before its method +is called. + +If you wait for a future, you should check early if the future was cancelled to +avoid useless operations. Example:: + + @coroutine + def slow_operation(fut): + if fut.cancelled(): + return + # ... slow computation ... + yield from fut + # ... + +The :func:`shield` function can also be used to ignore cancellation. + + .. _asyncio-multithreading: Concurrency and multithreading -- cgit v0.12 From f2e43cbbd4b9325da638a6b45b52e766ca91b131 Mon Sep 17 00:00:00 2001 From: Victor Stinner Date: Fri, 30 Jan 2015 01:20:44 +0100 Subject: Issue #23347, asyncio: send_signal(), terminate(), kill() don't check if the transport was closed. The check broken a Tulip example and this limitation is arbitrary. Check if _proc is None should be enough. Enhance also close(): do nothing when called the second time. --- Lib/asyncio/base_subprocess.py | 7 +++---- Lib/test/test_asyncio/test_subprocess.py | 16 ---------------- 2 files changed, 3 insertions(+), 20 deletions(-) diff --git a/Lib/asyncio/base_subprocess.py b/Lib/asyncio/base_subprocess.py index 70676ab..02b9e89 100644 --- a/Lib/asyncio/base_subprocess.py +++ b/Lib/asyncio/base_subprocess.py @@ -84,6 +84,8 @@ class BaseSubprocessTransport(transports.SubprocessTransport): raise NotImplementedError def close(self): + if self._closed: + return self._closed = True for proto in self._pipes.values(): @@ -100,8 +102,7 @@ class BaseSubprocessTransport(transports.SubprocessTransport): except ProcessLookupError: pass - # Don't clear the _proc reference yet because _post_init() may - # still run + # Don't clear the _proc reference yet: _post_init() may still run # On Python 3.3 and older, objects with a destructor part of a reference # cycle are never destroyed. It's not more the case on Python 3.4 thanks @@ -125,8 +126,6 @@ class BaseSubprocessTransport(transports.SubprocessTransport): return None def _check_proc(self): - if self._closed: - raise ValueError("operation on closed transport") if self._proc is None: raise ProcessLookupError() diff --git a/Lib/test/test_asyncio/test_subprocess.py b/Lib/test/test_asyncio/test_subprocess.py index d4b71b7..b467b04 100644 --- a/Lib/test/test_asyncio/test_subprocess.py +++ b/Lib/test/test_asyncio/test_subprocess.py @@ -47,22 +47,6 @@ class SubprocessTransportTests(test_utils.TestCase): None, None, None, 0, waiter=waiter) return (transport, protocol) - def test_close(self): - waiter = asyncio.Future(loop=self.loop) - transport, protocol = self.create_transport(waiter) - transport._process_exited(0) - transport.close() - - # The loop didn't run yet - self.assertFalse(protocol.connection_made.called) - - # methods must raise ProcessLookupError if the transport was closed - self.assertRaises(ValueError, transport.send_signal, signal.SIGTERM) - self.assertRaises(ValueError, transport.terminate) - self.assertRaises(ValueError, transport.kill) - - self.loop.run_until_complete(waiter) - def test_proc_exited(self): waiter = asyncio.Future(loop=self.loop) transport, protocol = self.create_transport(waiter) -- cgit v0.12 From 188f2c0b75c69cfbe082f1eca2ed8f8a04fa1718 Mon Sep 17 00:00:00 2001 From: Victor Stinner Date: Fri, 30 Jan 2015 01:35:14 +0100 Subject: asyncio doc: document the new ResourceWarning warnings --- Doc/library/asyncio-dev.rst | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/Doc/library/asyncio-dev.rst b/Doc/library/asyncio-dev.rst index 72a06f5..ce1275b 100644 --- a/Doc/library/asyncio-dev.rst +++ b/Doc/library/asyncio-dev.rst @@ -372,3 +372,14 @@ traceback where the task was created. Example of log in debug mode:: :ref:`Detect coroutine objects never scheduled `. + +Close transports +---------------- + +When a transport is no more needed, call its ``close()`` method to release +resources. + +If a transport (or an event loop) is not closed explicitly, a +:exc:`ResourceWarning` warning will be emitted in its destructor. The +:exc:`ResourceWarning` warnings are hidden by default: use the ``-Wd`` command +line option of Python to show them. -- cgit v0.12 From 756f0b19823dbd44fa1cc32089963971b7169cc4 Mon Sep 17 00:00:00 2001 From: R David Murray Date: Thu, 29 Jan 2015 19:53:33 -0500 Subject: Fix asyncio doc typo. --- Doc/library/asyncio-eventloop.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Doc/library/asyncio-eventloop.rst b/Doc/library/asyncio-eventloop.rst index 12e60c4..4f7fdfe 100644 --- a/Doc/library/asyncio-eventloop.rst +++ b/Doc/library/asyncio-eventloop.rst @@ -641,7 +641,7 @@ Server Server listening on sockets. Object created by the :meth:`BaseEventLoop.create_server` method and the - :func:`start_server` function. Don't instanciate the class directly. + :func:`start_server` function. Don't instantiate the class directly. .. method:: close() -- cgit v0.12