summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--Doc/library/queue.rst7
-rw-r--r--Lib/asyncio/base_events.py2
-rw-r--r--Lib/asyncio/coroutines.py4
-rw-r--r--Lib/asyncio/futures.py24
-rw-r--r--Lib/asyncio/tasks.py8
-rw-r--r--Lib/test/test_asyncio/test_events.py2
-rw-r--r--Lib/test/test_asyncio/test_futures.py68
7 files changed, 96 insertions, 19 deletions
diff --git a/Doc/library/queue.rst b/Doc/library/queue.rst
index c7dfc35..ed408f4 100644
--- a/Doc/library/queue.rst
+++ b/Doc/library/queue.rst
@@ -23,6 +23,8 @@ the first retrieved (operating like a stack). With a priority queue,
the entries are kept sorted (using the :mod:`heapq` module) and the
lowest valued entry is retrieved first.
+Internally, the module uses locks to temporarily block competing threads;
+however, it is not designed to handle reentrancy within a thread.
The :mod:`queue` module defines the following classes and exceptions:
@@ -189,11 +191,6 @@ Example of how to wait for enqueued tasks to be completed::
t.join()
-.. note::
-
- The :mod:`queue` module is not safe for use from :mod:`signal` handlers as
- it uses :mod:`threading` locks.
-
.. seealso::
Class :class:`multiprocessing.Queue`
diff --git a/Lib/asyncio/base_events.py b/Lib/asyncio/base_events.py
index b420586..9c2fa12 100644
--- a/Lib/asyncio/base_events.py
+++ b/Lib/asyncio/base_events.py
@@ -414,7 +414,7 @@ class BaseEventLoop(events.AbstractEventLoop):
"""
self._check_closed()
- new_task = not isinstance(future, futures.Future)
+ new_task = not futures.isfuture(future)
future = tasks.ensure_future(future, loop=self)
if new_task:
# An exception is raised if the future didn't complete, so there
diff --git a/Lib/asyncio/coroutines.py b/Lib/asyncio/coroutines.py
index 9c338b0..d92f67d 100644
--- a/Lib/asyncio/coroutines.py
+++ b/Lib/asyncio/coroutines.py
@@ -204,8 +204,8 @@ def coroutine(func):
@functools.wraps(func)
def coro(*args, **kw):
res = func(*args, **kw)
- if isinstance(res, futures.Future) or inspect.isgenerator(res) or \
- isinstance(res, CoroWrapper):
+ if (futures.isfuture(res) or inspect.isgenerator(res) or
+ isinstance(res, CoroWrapper)):
res = yield from res
elif _AwaitableABC is not None:
# If 'func' returns an Awaitable (new in 3.5) we
diff --git a/Lib/asyncio/futures.py b/Lib/asyncio/futures.py
index edc13dc..bcd4d16 100644
--- a/Lib/asyncio/futures.py
+++ b/Lib/asyncio/futures.py
@@ -110,6 +110,16 @@ class _TracebackLogger:
self.loop.call_exception_handler({'message': msg})
+def isfuture(obj):
+ """Check for a Future.
+
+ This returns True when obj is a Future instance or is advertising
+ itself as duck-type compatible by setting _asyncio_future_blocking.
+ See comment in Future for more details.
+ """
+ return getattr(obj, '_asyncio_future_blocking', None) is not None
+
+
class Future:
"""This class is *almost* compatible with concurrent.futures.Future.
@@ -423,15 +433,17 @@ def _chain_future(source, destination):
If destination is cancelled, source gets cancelled too.
Compatible with both asyncio.Future and concurrent.futures.Future.
"""
- if not isinstance(source, (Future, concurrent.futures.Future)):
+ if not isfuture(source) and not isinstance(source,
+ concurrent.futures.Future):
raise TypeError('A future is required for source argument')
- if not isinstance(destination, (Future, concurrent.futures.Future)):
+ if not isfuture(destination) and not isinstance(destination,
+ concurrent.futures.Future):
raise TypeError('A future is required for destination argument')
- source_loop = source._loop if isinstance(source, Future) else None
- dest_loop = destination._loop if isinstance(destination, Future) else None
+ source_loop = source._loop if isfuture(source) else None
+ dest_loop = destination._loop if isfuture(destination) else None
def _set_state(future, other):
- if isinstance(future, Future):
+ if isfuture(future):
_copy_future_state(other, future)
else:
_set_concurrent_future_state(future, other)
@@ -455,7 +467,7 @@ def _chain_future(source, destination):
def wrap_future(future, *, loop=None):
"""Wrap concurrent.futures.Future object."""
- if isinstance(future, Future):
+ if isfuture(future):
return future
assert isinstance(future, concurrent.futures.Future), \
'concurrent.futures.Future is expected, got {!r}'.format(future)
diff --git a/Lib/asyncio/tasks.py b/Lib/asyncio/tasks.py
index 3e200f6..35c945c 100644
--- a/Lib/asyncio/tasks.py
+++ b/Lib/asyncio/tasks.py
@@ -333,7 +333,7 @@ def wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED):
Note: This does not raise TimeoutError! Futures that aren't done
when the timeout occurs are returned in the second set.
"""
- if isinstance(fs, futures.Future) or coroutines.iscoroutine(fs):
+ if futures.isfuture(fs) or coroutines.iscoroutine(fs):
raise TypeError("expect a list of futures, not %s" % type(fs).__name__)
if not fs:
raise ValueError('Set of coroutines/Futures is empty.')
@@ -462,7 +462,7 @@ def as_completed(fs, *, loop=None, timeout=None):
Note: The futures 'f' are not necessarily members of fs.
"""
- if isinstance(fs, futures.Future) or coroutines.iscoroutine(fs):
+ if futures.isfuture(fs) or coroutines.iscoroutine(fs):
raise TypeError("expect a list of futures, not %s" % type(fs).__name__)
loop = loop if loop is not None else events.get_event_loop()
todo = {ensure_future(f, loop=loop) for f in set(fs)}
@@ -538,7 +538,7 @@ def ensure_future(coro_or_future, *, loop=None):
If the argument is a Future, it is returned directly.
"""
- if isinstance(coro_or_future, futures.Future):
+ if futures.isfuture(coro_or_future):
if loop is not None and loop is not coro_or_future._loop:
raise ValueError('loop argument must agree with Future')
return coro_or_future
@@ -614,7 +614,7 @@ def gather(*coros_or_futures, loop=None, return_exceptions=False):
arg_to_fut = {}
for arg in set(coros_or_futures):
- if not isinstance(arg, futures.Future):
+ if not futures.isfuture(arg):
fut = ensure_future(arg, loop=loop)
if loop is None:
loop = fut._loop
diff --git a/Lib/test/test_asyncio/test_events.py b/Lib/test/test_asyncio/test_events.py
index e742eb7..7c901f2 100644
--- a/Lib/test/test_asyncio/test_events.py
+++ b/Lib/test/test_asyncio/test_events.py
@@ -793,7 +793,7 @@ class EventLoopTestsMixin:
loop.connect_accepted_socket(
(lambda : proto), conn, ssl=server_ssl))
loop.run_forever()
- conn.close()
+ proto.transport.close()
lsock.close()
thread.join(1)
diff --git a/Lib/test/test_asyncio/test_futures.py b/Lib/test/test_asyncio/test_futures.py
index c38c1f2..d20eb68 100644
--- a/Lib/test/test_asyncio/test_futures.py
+++ b/Lib/test/test_asyncio/test_futures.py
@@ -25,6 +25,74 @@ def last_cb():
pass
+class DuckFuture:
+ # Class that does not inherit from Future but aims to be duck-type
+ # compatible with it.
+
+ _asyncio_future_blocking = False
+ __cancelled = False
+ __result = None
+ __exception = None
+
+ def cancel(self):
+ if self.done():
+ return False
+ self.__cancelled = True
+ return True
+
+ def cancelled(self):
+ return self.__cancelled
+
+ def done(self):
+ return (self.__cancelled
+ or self.__result is not None
+ or self.__exception is not None)
+
+ def result(self):
+ assert not self.cancelled()
+ if self.__exception is not None:
+ raise self.__exception
+ return self.__result
+
+ def exception(self):
+ assert not self.cancelled()
+ return self.__exception
+
+ def set_result(self, result):
+ assert not self.done()
+ assert result is not None
+ self.__result = result
+
+ def set_exception(self, exception):
+ assert not self.done()
+ assert exception is not None
+ self.__exception = exception
+
+ def __iter__(self):
+ if not self.done():
+ self._asyncio_future_blocking = True
+ yield self
+ assert self.done()
+ return self.result()
+
+
+class DuckTests(test_utils.TestCase):
+
+ def setUp(self):
+ self.loop = self.new_test_loop()
+ self.addCleanup(self.loop.close)
+
+ def test_wrap_future(self):
+ f = DuckFuture()
+ g = asyncio.wrap_future(f)
+ assert g is f
+
+ def test_ensure_future(self):
+ f = DuckFuture()
+ g = asyncio.ensure_future(f)
+ assert g is f
+
+
class FutureTests(test_utils.TestCase):
def setUp(self):