summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--Lib/asyncio/compat.py17
-rw-r--r--Lib/asyncio/coroutines.py6
-rw-r--r--Lib/asyncio/events.py7
-rw-r--r--Lib/asyncio/futures.py10
-rw-r--r--Lib/asyncio/locks.py7
-rw-r--r--Lib/asyncio/queues.py7
-rw-r--r--Lib/asyncio/streams.py24
-rw-r--r--Lib/asyncio/subprocess.py2
-rw-r--r--Lib/asyncio/tasks.py7
-rw-r--r--Lib/asyncio/transports.py12
-rw-r--r--Lib/test/test_asyncio/test_streams.py28
11 files changed, 73 insertions, 54 deletions
diff --git a/Lib/asyncio/compat.py b/Lib/asyncio/compat.py
new file mode 100644
index 0000000..660b7e7
--- /dev/null
+++ b/Lib/asyncio/compat.py
@@ -0,0 +1,17 @@
+"""Compatibility helpers for the different Python versions."""
+
+import sys
+
+PY34 = sys.version_info >= (3, 4)
+PY35 = sys.version_info >= (3, 5)
+
+
+def flatten_list_bytes(list_of_data):
+ """Concatenate a sequence of bytes-like objects."""
+ if not PY34:
+ # On Python 3.3 and older, bytes.join() doesn't handle
+ # memoryview.
+ list_of_data = (
+ bytes(data) if isinstance(data, memoryview) else data
+ for data in list_of_data)
+ return b''.join(list_of_data)
diff --git a/Lib/asyncio/coroutines.py b/Lib/asyncio/coroutines.py
index 15475f2..e11b21b 100644
--- a/Lib/asyncio/coroutines.py
+++ b/Lib/asyncio/coroutines.py
@@ -9,14 +9,12 @@ import sys
import traceback
import types
+from . import compat
from . import events
from . import futures
from .log import logger
-_PY35 = sys.version_info >= (3, 5)
-
-
# Opcode of "yield from" instruction
_YIELD_FROM = opcode.opmap['YIELD_FROM']
@@ -140,7 +138,7 @@ class CoroWrapper:
def gi_code(self):
return self.gen.gi_code
- if _PY35:
+ if compat.PY35:
__await__ = __iter__ # make compatible with 'await' expression
diff --git a/Lib/asyncio/events.py b/Lib/asyncio/events.py
index 496075b..d5f0d45 100644
--- a/Lib/asyncio/events.py
+++ b/Lib/asyncio/events.py
@@ -17,12 +17,11 @@ import sys
import threading
import traceback
-
-_PY34 = sys.version_info >= (3, 4)
+from asyncio import compat
def _get_function_source(func):
- if _PY34:
+ if compat.PY34:
func = inspect.unwrap(func)
elif hasattr(func, '__wrapped__'):
func = func.__wrapped__
@@ -31,7 +30,7 @@ def _get_function_source(func):
return (code.co_filename, code.co_firstlineno)
if isinstance(func, functools.partial):
return _get_function_source(func.func)
- if _PY34 and isinstance(func, functools.partialmethod):
+ if compat.PY34 and isinstance(func, functools.partialmethod):
return _get_function_source(func.func)
return None
diff --git a/Lib/asyncio/futures.py b/Lib/asyncio/futures.py
index d06828a..dbe06c4 100644
--- a/Lib/asyncio/futures.py
+++ b/Lib/asyncio/futures.py
@@ -11,6 +11,7 @@ import reprlib
import sys
import traceback
+from . import compat
from . import events
# States for Future.
@@ -18,9 +19,6 @@ _PENDING = 'PENDING'
_CANCELLED = 'CANCELLED'
_FINISHED = 'FINISHED'
-_PY34 = sys.version_info >= (3, 4)
-_PY35 = sys.version_info >= (3, 5)
-
Error = concurrent.futures._base.Error
CancelledError = concurrent.futures.CancelledError
TimeoutError = concurrent.futures.TimeoutError
@@ -199,7 +197,7 @@ class Future:
# On Python 3.3 and older, objects with a destructor part of a reference
# cycle are never destroyed. It's not more the case on Python 3.4 thanks
# to the PEP 442.
- if _PY34:
+ if compat.PY34:
def __del__(self):
if not self._log_traceback:
# set_exception() was not called, or result() or exception()
@@ -352,7 +350,7 @@ class Future:
self._exception = exception
self._state = _FINISHED
self._schedule_callbacks()
- if _PY34:
+ if compat.PY34:
self._log_traceback = True
else:
self._tb_logger = _TracebackLogger(self, exception)
@@ -388,7 +386,7 @@ class Future:
assert self.done(), "yield from wasn't used with future"
return self.result() # May raise too.
- if _PY35:
+ if compat.PY35:
__await__ = __iter__ # make compatible with 'await' expression
diff --git a/Lib/asyncio/locks.py b/Lib/asyncio/locks.py
index b2e516b..7a13279 100644
--- a/Lib/asyncio/locks.py
+++ b/Lib/asyncio/locks.py
@@ -3,16 +3,13 @@
__all__ = ['Lock', 'Event', 'Condition', 'Semaphore', 'BoundedSemaphore']
import collections
-import sys
+from . import compat
from . import events
from . import futures
from .coroutines import coroutine
-_PY35 = sys.version_info >= (3, 5)
-
-
class _ContextManager:
"""Context manager.
@@ -70,7 +67,7 @@ class _ContextManagerMixin:
yield from self.acquire()
return _ContextManager(self)
- if _PY35:
+ if compat.PY35:
def __await__(self):
# To make "with await lock" work.
diff --git a/Lib/asyncio/queues.py b/Lib/asyncio/queues.py
index e9ba0cd..c55dd8b 100644
--- a/Lib/asyncio/queues.py
+++ b/Lib/asyncio/queues.py
@@ -5,6 +5,7 @@ __all__ = ['Queue', 'PriorityQueue', 'LifoQueue', 'QueueFull', 'QueueEmpty']
import collections
import heapq
+from . import compat
from . import events
from . import futures
from . import locks
@@ -286,3 +287,9 @@ class LifoQueue(Queue):
def _get(self):
return self._queue.pop()
+
+
+if not compat.PY35:
+ JoinableQueue = Queue
+ """Deprecated alias for Queue."""
+ __all__.append('JoinableQueue')
diff --git a/Lib/asyncio/streams.py b/Lib/asyncio/streams.py
index 176c65e..6484c43 100644
--- a/Lib/asyncio/streams.py
+++ b/Lib/asyncio/streams.py
@@ -6,12 +6,12 @@ __all__ = ['StreamReader', 'StreamWriter', 'StreamReaderProtocol',
]
import socket
-import sys
if hasattr(socket, 'AF_UNIX'):
__all__.extend(['open_unix_connection', 'start_unix_server'])
from . import coroutines
+from . import compat
from . import events
from . import futures
from . import protocols
@@ -20,7 +20,6 @@ from .log import logger
_DEFAULT_LIMIT = 2**16
-_PY35 = sys.version_info >= (3, 5)
class IncompleteReadError(EOFError):
@@ -240,6 +239,7 @@ class StreamReaderProtocol(FlowControlMixin, protocols.Protocol):
def eof_received(self):
self._stream_reader.feed_eof()
+ return True
class StreamWriter:
@@ -321,6 +321,24 @@ class StreamReader:
self._transport = None
self._paused = False
+ def __repr__(self):
+ info = ['StreamReader']
+ if self._buffer:
+ info.append('%d bytes' % len(info))
+ if self._eof:
+ info.append('eof')
+ if self._limit != _DEFAULT_LIMIT:
+ info.append('l=%d' % self._limit)
+ if self._waiter:
+ info.append('w=%r' % self._waiter)
+ if self._exception:
+ info.append('e=%r' % self._exception)
+ if self._transport:
+ info.append('t=%r' % self._transport)
+ if self._paused:
+ info.append('paused')
+ return '<%s>' % ' '.join(info)
+
def exception(self):
return self._exception
@@ -488,7 +506,7 @@ class StreamReader:
return b''.join(blocks)
- if _PY35:
+ if compat.PY35:
@coroutine
def __aiter__(self):
return self
diff --git a/Lib/asyncio/subprocess.py b/Lib/asyncio/subprocess.py
index 4600a9f..ead4039 100644
--- a/Lib/asyncio/subprocess.py
+++ b/Lib/asyncio/subprocess.py
@@ -1,10 +1,8 @@
__all__ = ['create_subprocess_exec', 'create_subprocess_shell']
-import collections
import subprocess
from . import events
-from . import futures
from . import protocols
from . import streams
from . import tasks
diff --git a/Lib/asyncio/tasks.py b/Lib/asyncio/tasks.py
index d8193ba..9bfc1cf 100644
--- a/Lib/asyncio/tasks.py
+++ b/Lib/asyncio/tasks.py
@@ -10,19 +10,16 @@ import concurrent.futures
import functools
import inspect
import linecache
-import sys
-import types
import traceback
import warnings
import weakref
+from . import compat
from . import coroutines
from . import events
from . import futures
from .coroutines import coroutine
-_PY34 = (sys.version_info >= (3, 4))
-
class Task(futures.Future):
"""A coroutine wrapped in a Future."""
@@ -83,7 +80,7 @@ class Task(futures.Future):
# On Python 3.3 or older, objects with a destructor that are part of a
# reference cycle are never destroyed. That's not the case any more on
# Python 3.4 thanks to the PEP 442.
- if _PY34:
+ if compat.PY34:
def __del__(self):
if self._state == futures._PENDING and self._log_destroy_pending:
context = {
diff --git a/Lib/asyncio/transports.py b/Lib/asyncio/transports.py
index 22df3c7..70b323f 100644
--- a/Lib/asyncio/transports.py
+++ b/Lib/asyncio/transports.py
@@ -1,8 +1,6 @@
"""Abstract Transport class."""
-import sys
-
-_PY34 = sys.version_info >= (3, 4)
+from asyncio import compat
__all__ = ['BaseTransport', 'ReadTransport', 'WriteTransport',
'Transport', 'DatagramTransport', 'SubprocessTransport',
@@ -94,12 +92,8 @@ class WriteTransport(BaseTransport):
The default implementation concatenates the arguments and
calls write() on the result.
"""
- if not _PY34:
- # In Python 3.3, bytes.join() doesn't handle memoryview.
- list_of_data = (
- bytes(data) if isinstance(data, memoryview) else data
- for data in list_of_data)
- self.write(b''.join(list_of_data))
+ data = compat.flatten_list_bytes(list_of_data)
+ self.write(data)
def write_eof(self):
"""Close the write end after flushing buffered data.
diff --git a/Lib/test/test_asyncio/test_streams.py b/Lib/test/test_asyncio/test_streams.py
index 242b377..ef6f603 100644
--- a/Lib/test/test_asyncio/test_streams.py
+++ b/Lib/test/test_asyncio/test_streams.py
@@ -446,6 +446,8 @@ class StreamReaderTests(test_utils.TestCase):
def handle_client(self, client_reader, client_writer):
data = yield from client_reader.readline()
client_writer.write(data)
+ yield from client_writer.drain()
+ client_writer.close()
def start(self):
sock = socket.socket()
@@ -457,12 +459,8 @@ class StreamReaderTests(test_utils.TestCase):
return sock.getsockname()
def handle_client_callback(self, client_reader, client_writer):
- task = asyncio.Task(client_reader.readline(), loop=self.loop)
-
- def done(task):
- client_writer.write(task.result())
-
- task.add_done_callback(done)
+ self.loop.create_task(self.handle_client(client_reader,
+ client_writer))
def start_callback(self):
sock = socket.socket()
@@ -522,6 +520,8 @@ class StreamReaderTests(test_utils.TestCase):
def handle_client(self, client_reader, client_writer):
data = yield from client_reader.readline()
client_writer.write(data)
+ yield from client_writer.drain()
+ client_writer.close()
def start(self):
self.server = self.loop.run_until_complete(
@@ -530,18 +530,14 @@ class StreamReaderTests(test_utils.TestCase):
loop=self.loop))
def handle_client_callback(self, client_reader, client_writer):
- task = asyncio.Task(client_reader.readline(), loop=self.loop)
-
- def done(task):
- client_writer.write(task.result())
-
- task.add_done_callback(done)
+ self.loop.create_task(self.handle_client(client_reader,
+ client_writer))
def start_callback(self):
- self.server = self.loop.run_until_complete(
- asyncio.start_unix_server(self.handle_client_callback,
- path=self.path,
- loop=self.loop))
+ start = asyncio.start_unix_server(self.handle_client_callback,
+ path=self.path,
+ loop=self.loop)
+ self.server = self.loop.run_until_complete(start)
def stop(self):
if self.server is not None: