summaryrefslogtreecommitdiffstats
path: root/Lib/test
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test')
-rw-r--r--Lib/test/_test_multiprocessing.py2
-rw-r--r--Lib/test/coding20731.py4
-rwxr-xr-xLib/test/regrtest.py9
-rw-r--r--Lib/test/test_asyncio/test_base_events.py110
-rw-r--r--Lib/test/test_asyncio/test_events.py455
-rw-r--r--Lib/test/test_asyncio/test_futures.py16
-rw-r--r--Lib/test/test_asyncio/test_locks.py20
-rw-r--r--Lib/test/test_asyncio/test_proactor_events.py78
-rw-r--r--Lib/test/test_asyncio/test_queues.py4
-rw-r--r--Lib/test/test_asyncio/test_selector_events.py273
-rw-r--r--Lib/test/test_asyncio/test_streams.py5
-rw-r--r--Lib/test/test_asyncio/test_tasks.py8
-rw-r--r--Lib/test/test_asyncio/test_transports.py6
-rw-r--r--Lib/test/test_asyncio/test_unix_events.py200
-rw-r--r--Lib/test/test_asyncio/test_windows_events.py3
-rw-r--r--Lib/test/test_asyncio/test_windows_utils.py31
-rw-r--r--Lib/test/test_copy.py1
-rw-r--r--Lib/test/test_email/test_email.py8
-rw-r--r--Lib/test/test_email/test_policy.py9
-rw-r--r--Lib/test/test_fileinput.py41
-rw-r--r--Lib/test/test_genericpath.py30
-rw-r--r--Lib/test/test_grammar.py4
-rw-r--r--Lib/test/test_gzip.py7
-rw-r--r--Lib/test/test_idle.py1
-rw-r--r--Lib/test/test_importlib/source/test_file_loader.py1
-rw-r--r--Lib/test/test_io.py38
-rw-r--r--Lib/test/test_modulefinder.py20
-rw-r--r--Lib/test/test_ntpath.py27
-rw-r--r--Lib/test/test_pkgutil.py5
-rw-r--r--Lib/test/test_posix.py2
-rw-r--r--Lib/test/test_pydoc.py10
-rw-r--r--Lib/test/test_range.py24
-rw-r--r--Lib/test/test_re.py18
-rw-r--r--Lib/test/test_robotparser.py1
-rw-r--r--Lib/test/test_shutil.py9
-rw-r--r--Lib/test/test_socket.py10
-rw-r--r--Lib/test/test_source_encoding.py10
-rw-r--r--Lib/test/test_support.py2
-rw-r--r--Lib/test/test_sys.py85
-rw-r--r--Lib/test/test_tarfile.py18
-rw-r--r--Lib/test/test_tcl.py1
-rw-r--r--Lib/test/test_threadsignals.py6
-rw-r--r--Lib/test/test_time.py139
-rw-r--r--Lib/test/test_tokenize.py98
-rw-r--r--Lib/test/test_tracemalloc.py2
-rw-r--r--Lib/test/test_types.py26
-rw-r--r--Lib/test/test_urllib2.py3
47 files changed, 1105 insertions, 775 deletions
diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py
index 06d3ca9..8eb57fe 100644
--- a/Lib/test/_test_multiprocessing.py
+++ b/Lib/test/_test_multiprocessing.py
@@ -3651,7 +3651,7 @@ class TestSemaphoreTracker(unittest.TestCase):
_multiprocessing.sem_unlink(name1)
p.terminate()
p.wait()
- time.sleep(1.0)
+ time.sleep(2.0)
with self.assertRaises(OSError) as ctx:
_multiprocessing.sem_unlink(name2)
# docs say it should be ENOENT, but OSX seems to give EINVAL
diff --git a/Lib/test/coding20731.py b/Lib/test/coding20731.py
new file mode 100644
index 0000000..b0e227a
--- /dev/null
+++ b/Lib/test/coding20731.py
@@ -0,0 +1,4 @@
+#coding:latin1
+
+
+
diff --git a/Lib/test/regrtest.py b/Lib/test/regrtest.py
index 17cbccb..c1c831f 100755
--- a/Lib/test/regrtest.py
+++ b/Lib/test/regrtest.py
@@ -1373,10 +1373,9 @@ def dash_R(the_module, test, indirect_test, huntrleaks):
try:
import zipimport
except ImportError:
- zsc = zdc = None # Run unmodified on platforms without zipimport support
+ zdc = None # Run unmodified on platforms without zipimport support
else:
zdc = zipimport._zip_directory_cache.copy()
- zsc = zipimport._zip_stat_cache.copy()
abcs = {}
for abc in [getattr(collections.abc, a) for a in collections.abc.__all__]:
if not isabstract(abc):
@@ -1395,7 +1394,7 @@ def dash_R(the_module, test, indirect_test, huntrleaks):
sys.stderr.flush()
for i in range(repcount):
indirect_test()
- alloc_after, rc_after = dash_R_cleanup(fs, ps, pic, zdc, zsc, abcs)
+ alloc_after, rc_after = dash_R_cleanup(fs, ps, pic, zdc, abcs)
sys.stderr.write('.')
sys.stderr.flush()
if i >= nwarmup:
@@ -1429,7 +1428,7 @@ def dash_R(the_module, test, indirect_test, huntrleaks):
failed = True
return failed
-def dash_R_cleanup(fs, ps, pic, zdc, zsc, abcs):
+def dash_R_cleanup(fs, ps, pic, zdc, abcs):
import gc, copyreg
import _strptime, linecache
import urllib.parse, urllib.request, mimetypes, doctest
@@ -1455,8 +1454,6 @@ def dash_R_cleanup(fs, ps, pic, zdc, zsc, abcs):
else:
zipimport._zip_directory_cache.clear()
zipimport._zip_directory_cache.update(zdc)
- zipimport._zip_stat_cache.clear()
- zipimport._zip_stat_cache.update(zsc)
# clear type cache
sys._clear_type_cache()
diff --git a/Lib/test/test_asyncio/test_base_events.py b/Lib/test/test_asyncio/test_base_events.py
index 784a39f..340ca67 100644
--- a/Lib/test/test_asyncio/test_base_events.py
+++ b/Lib/test/test_asyncio/test_base_events.py
@@ -6,8 +6,8 @@ import socket
import sys
import time
import unittest
-import unittest.mock
-from test.support import find_unused_port, IPV6_ENABLED
+from unittest import mock
+from test.support import IPV6_ENABLED
import asyncio
from asyncio import base_events
@@ -15,7 +15,7 @@ from asyncio import constants
from asyncio import test_utils
-MOCK_ANY = unittest.mock.ANY
+MOCK_ANY = mock.ANY
PY34 = sys.version_info >= (3, 4)
@@ -23,11 +23,11 @@ class BaseEventLoopTests(unittest.TestCase):
def setUp(self):
self.loop = base_events.BaseEventLoop()
- self.loop._selector = unittest.mock.Mock()
+ self.loop._selector = mock.Mock()
asyncio.set_event_loop(None)
def test_not_implemented(self):
- m = unittest.mock.Mock()
+ m = mock.Mock()
self.assertRaises(
NotImplementedError,
self.loop._make_socket_transport, m, m)
@@ -75,13 +75,13 @@ class BaseEventLoopTests(unittest.TestCase):
self.assertFalse(self.loop._ready)
def test_set_default_executor(self):
- executor = unittest.mock.Mock()
+ executor = mock.Mock()
self.loop.set_default_executor(executor)
self.assertIs(executor, self.loop._default_executor)
def test_getnameinfo(self):
- sockaddr = unittest.mock.Mock()
- self.loop.run_in_executor = unittest.mock.Mock()
+ sockaddr = mock.Mock()
+ self.loop.run_in_executor = mock.Mock()
self.loop.getnameinfo(sockaddr)
self.assertEqual(
(None, socket.getnameinfo, sockaddr, 0),
@@ -111,7 +111,7 @@ class BaseEventLoopTests(unittest.TestCase):
def cb(arg):
calls.append(arg)
- self.loop._process_events = unittest.mock.Mock()
+ self.loop._process_events = mock.Mock()
self.loop.call_later(-1, cb, 'a')
self.loop.call_later(-2, cb, 'b')
test_utils.run_briefly(self.loop)
@@ -121,7 +121,7 @@ class BaseEventLoopTests(unittest.TestCase):
def cb():
self.loop.stop()
- self.loop._process_events = unittest.mock.Mock()
+ self.loop._process_events = mock.Mock()
delay = 0.1
when = self.loop.time() + delay
@@ -163,7 +163,7 @@ class BaseEventLoopTests(unittest.TestCase):
pass
h = asyncio.Handle(cb, (), self.loop)
f = asyncio.Future(loop=self.loop)
- executor = unittest.mock.Mock()
+ executor = mock.Mock()
executor.submit.return_value = f
self.loop.set_default_executor(executor)
@@ -171,7 +171,7 @@ class BaseEventLoopTests(unittest.TestCase):
res = self.loop.run_in_executor(None, h)
self.assertIs(f, res)
- executor = unittest.mock.Mock()
+ executor = mock.Mock()
executor.submit.return_value = f
res = self.loop.run_in_executor(executor, h)
self.assertIs(f, res)
@@ -187,7 +187,7 @@ class BaseEventLoopTests(unittest.TestCase):
h1.cancel()
- self.loop._process_events = unittest.mock.Mock()
+ self.loop._process_events = mock.Mock()
self.loop._scheduled.append(h1)
self.loop._scheduled.append(h2)
self.loop._run_once()
@@ -203,8 +203,8 @@ class BaseEventLoopTests(unittest.TestCase):
self.loop.set_debug(False)
self.assertFalse(self.loop.get_debug())
- @unittest.mock.patch('asyncio.base_events.time')
- @unittest.mock.patch('asyncio.base_events.logger')
+ @mock.patch('asyncio.base_events.time')
+ @mock.patch('asyncio.base_events.logger')
def test__run_once_logging(self, m_logger, m_time):
# Log to INFO level if timeout > 1.0 sec.
idx = -1
@@ -219,7 +219,7 @@ class BaseEventLoopTests(unittest.TestCase):
self.loop._scheduled.append(
asyncio.TimerHandle(11.0, lambda: True, (), self.loop))
- self.loop._process_events = unittest.mock.Mock()
+ self.loop._process_events = mock.Mock()
self.loop._run_once()
self.assertEqual(logging.INFO, m_logger.log.call_args[0][0])
@@ -242,7 +242,7 @@ class BaseEventLoopTests(unittest.TestCase):
h = asyncio.TimerHandle(time.monotonic() - 1, cb, (self.loop,),
self.loop)
- self.loop._process_events = unittest.mock.Mock()
+ self.loop._process_events = mock.Mock()
self.loop._scheduled.append(h)
self.loop._run_once()
@@ -303,14 +303,14 @@ class BaseEventLoopTests(unittest.TestCase):
asyncio.SubprocessProtocol, 'exit 0', bufsize=4096)
def test_default_exc_handler_callback(self):
- self.loop._process_events = unittest.mock.Mock()
+ self.loop._process_events = mock.Mock()
def zero_error(fut):
fut.set_result(True)
1/0
# Test call_soon (events.Handle)
- with unittest.mock.patch('asyncio.base_events.logger') as log:
+ with mock.patch('asyncio.base_events.logger') as log:
fut = asyncio.Future(loop=self.loop)
self.loop.call_soon(zero_error, fut)
fut.add_done_callback(lambda fut: self.loop.stop())
@@ -320,7 +320,7 @@ class BaseEventLoopTests(unittest.TestCase):
exc_info=(ZeroDivisionError, MOCK_ANY, MOCK_ANY))
# Test call_later (events.TimerHandle)
- with unittest.mock.patch('asyncio.base_events.logger') as log:
+ with mock.patch('asyncio.base_events.logger') as log:
fut = asyncio.Future(loop=self.loop)
self.loop.call_later(0.01, zero_error, fut)
fut.add_done_callback(lambda fut: self.loop.stop())
@@ -330,7 +330,7 @@ class BaseEventLoopTests(unittest.TestCase):
exc_info=(ZeroDivisionError, MOCK_ANY, MOCK_ANY))
def test_default_exc_handler_coro(self):
- self.loop._process_events = unittest.mock.Mock()
+ self.loop._process_events = mock.Mock()
@asyncio.coroutine
def zero_error_coro():
@@ -338,7 +338,7 @@ class BaseEventLoopTests(unittest.TestCase):
1/0
# Test Future.__del__
- with unittest.mock.patch('asyncio.base_events.logger') as log:
+ with mock.patch('asyncio.base_events.logger') as log:
fut = asyncio.async(zero_error_coro(), loop=self.loop)
fut.add_done_callback(lambda *args: self.loop.stop())
self.loop.run_forever()
@@ -368,9 +368,9 @@ class BaseEventLoopTests(unittest.TestCase):
self.loop.call_soon(zero_error)
self.loop._run_once()
- self.loop._process_events = unittest.mock.Mock()
+ self.loop._process_events = mock.Mock()
- mock_handler = unittest.mock.Mock()
+ mock_handler = mock.Mock()
self.loop.set_exception_handler(mock_handler)
run_loop()
mock_handler.assert_called_with(self.loop, {
@@ -382,7 +382,7 @@ class BaseEventLoopTests(unittest.TestCase):
mock_handler.reset_mock()
self.loop.set_exception_handler(None)
- with unittest.mock.patch('asyncio.base_events.logger') as log:
+ with mock.patch('asyncio.base_events.logger') as log:
run_loop()
log.error.assert_called_with(
test_utils.MockPattern(
@@ -401,11 +401,11 @@ class BaseEventLoopTests(unittest.TestCase):
def handler(loop, context):
raise AttributeError('spam')
- self.loop._process_events = unittest.mock.Mock()
+ self.loop._process_events = mock.Mock()
self.loop.set_exception_handler(handler)
- with unittest.mock.patch('asyncio.base_events.logger') as log:
+ with mock.patch('asyncio.base_events.logger') as log:
run_loop()
log.error.assert_called_with(
test_utils.MockPattern(
@@ -417,8 +417,8 @@ class BaseEventLoopTests(unittest.TestCase):
class Loop(base_events.BaseEventLoop):
- _selector = unittest.mock.Mock()
- _process_events = unittest.mock.Mock()
+ _selector = mock.Mock()
+ _process_events = mock.Mock()
def default_exception_handler(self, context):
nonlocal _context
@@ -435,7 +435,7 @@ class BaseEventLoopTests(unittest.TestCase):
loop.call_soon(zero_error)
loop._run_once()
- with unittest.mock.patch('asyncio.base_events.logger') as log:
+ with mock.patch('asyncio.base_events.logger') as log:
run_loop()
log.error.assert_called_with(
'Exception in default exception handler',
@@ -446,7 +446,7 @@ class BaseEventLoopTests(unittest.TestCase):
_context = None
loop.set_exception_handler(custom_handler)
- with unittest.mock.patch('asyncio.base_events.logger') as log:
+ with mock.patch('asyncio.base_events.logger') as log:
run_loop()
log.error.assert_called_with(
test_utils.MockPattern('Exception in default exception.*'
@@ -527,7 +527,7 @@ class BaseEventLoopWithSelectorTests(unittest.TestCase):
def tearDown(self):
self.loop.close()
- @unittest.mock.patch('asyncio.base_events.socket')
+ @mock.patch('asyncio.base_events.socket')
def test_create_connection_multiple_errors(self, m_socket):
class MyProto(asyncio.Protocol):
@@ -592,7 +592,7 @@ class BaseEventLoopWithSelectorTests(unittest.TestCase):
return asyncio.Task(getaddrinfo(*args, **kwds), loop=self.loop)
self.loop.getaddrinfo = getaddrinfo_task
- self.loop.sock_connect = unittest.mock.Mock()
+ self.loop.sock_connect = mock.Mock()
self.loop.sock_connect.side_effect = OSError
coro = self.loop.create_connection(MyProto, 'example.com', 80)
@@ -609,7 +609,7 @@ class BaseEventLoopWithSelectorTests(unittest.TestCase):
return asyncio.Task(getaddrinfo(*args, **kwds), loop=self.loop)
self.loop.getaddrinfo = getaddrinfo_task
- self.loop.sock_connect = unittest.mock.Mock()
+ self.loop.sock_connect = mock.Mock()
self.loop.sock_connect.side_effect = OSError
coro = self.loop.create_connection(
@@ -617,7 +617,7 @@ class BaseEventLoopWithSelectorTests(unittest.TestCase):
with self.assertRaises(OSError):
self.loop.run_until_complete(coro)
- @unittest.mock.patch('asyncio.base_events.socket')
+ @mock.patch('asyncio.base_events.socket')
def test_create_connection_multiple_errors_local_addr(self, m_socket):
def bind(addr):
@@ -637,7 +637,7 @@ class BaseEventLoopWithSelectorTests(unittest.TestCase):
return asyncio.Task(getaddrinfo(*args, **kwds), loop=self.loop)
self.loop.getaddrinfo = getaddrinfo_task
- self.loop.sock_connect = unittest.mock.Mock()
+ self.loop.sock_connect = mock.Mock()
self.loop.sock_connect.side_effect = OSError('Err2')
coro = self.loop.create_connection(
@@ -669,7 +669,7 @@ class BaseEventLoopWithSelectorTests(unittest.TestCase):
OSError, self.loop.run_until_complete, coro)
def test_create_connection_ssl_server_hostname_default(self):
- self.loop.getaddrinfo = unittest.mock.Mock()
+ self.loop.getaddrinfo = mock.Mock()
def mock_getaddrinfo(*args, **kwds):
f = asyncio.Future(loop=self.loop)
@@ -678,9 +678,9 @@ class BaseEventLoopWithSelectorTests(unittest.TestCase):
return f
self.loop.getaddrinfo.side_effect = mock_getaddrinfo
- self.loop.sock_connect = unittest.mock.Mock()
+ self.loop.sock_connect = mock.Mock()
self.loop.sock_connect.return_value = ()
- self.loop._make_ssl_transport = unittest.mock.Mock()
+ self.loop._make_ssl_transport = mock.Mock()
class _SelectorTransportMock:
_sock = None
@@ -696,7 +696,7 @@ class BaseEventLoopWithSelectorTests(unittest.TestCase):
return transport
self.loop._make_ssl_transport.side_effect = mock_make_ssl_transport
- ANY = unittest.mock.ANY
+ ANY = mock.ANY
# First try the default server_hostname.
self.loop._make_ssl_transport.reset_mock()
coro = self.loop.create_connection(MyProto, 'python.org', 80, ssl=True)
@@ -775,13 +775,13 @@ class BaseEventLoopWithSelectorTests(unittest.TestCase):
self.assertRaises(ValueError, self.loop.run_until_complete, fut)
def test_create_server_no_getaddrinfo(self):
- getaddrinfo = self.loop.getaddrinfo = unittest.mock.Mock()
+ getaddrinfo = self.loop.getaddrinfo = mock.Mock()
getaddrinfo.return_value = []
f = self.loop.create_server(MyProto, '0.0.0.0', 0)
self.assertRaises(OSError, self.loop.run_until_complete, f)
- @unittest.mock.patch('asyncio.base_events.socket')
+ @mock.patch('asyncio.base_events.socket')
def test_create_server_cant_bind(self, m_socket):
class Err(OSError):
@@ -790,14 +790,14 @@ class BaseEventLoopWithSelectorTests(unittest.TestCase):
m_socket.getaddrinfo.return_value = [
(2, 1, 6, '', ('127.0.0.1', 10100))]
m_socket.getaddrinfo._is_coroutine = False
- m_sock = m_socket.socket.return_value = unittest.mock.Mock()
+ m_sock = m_socket.socket.return_value = mock.Mock()
m_sock.bind.side_effect = Err
fut = self.loop.create_server(MyProto, '0.0.0.0', 0)
self.assertRaises(OSError, self.loop.run_until_complete, fut)
self.assertTrue(m_sock.close.called)
- @unittest.mock.patch('asyncio.base_events.socket')
+ @mock.patch('asyncio.base_events.socket')
def test_create_datagram_endpoint_no_addrinfo(self, m_socket):
m_socket.getaddrinfo.return_value = []
m_socket.getaddrinfo._is_coroutine = False
@@ -818,7 +818,7 @@ class BaseEventLoopWithSelectorTests(unittest.TestCase):
AssertionError, self.loop.run_until_complete, coro)
def test_create_datagram_endpoint_connect_err(self):
- self.loop.sock_connect = unittest.mock.Mock()
+ self.loop.sock_connect = mock.Mock()
self.loop.sock_connect.side_effect = OSError
coro = self.loop.create_datagram_endpoint(
@@ -826,7 +826,7 @@ class BaseEventLoopWithSelectorTests(unittest.TestCase):
self.assertRaises(
OSError, self.loop.run_until_complete, coro)
- @unittest.mock.patch('asyncio.base_events.socket')
+ @mock.patch('asyncio.base_events.socket')
def test_create_datagram_endpoint_socket_err(self, m_socket):
m_socket.getaddrinfo = socket.getaddrinfo
m_socket.socket.side_effect = OSError
@@ -849,7 +849,7 @@ class BaseEventLoopWithSelectorTests(unittest.TestCase):
self.assertRaises(
ValueError, self.loop.run_until_complete, coro)
- @unittest.mock.patch('asyncio.base_events.socket')
+ @mock.patch('asyncio.base_events.socket')
def test_create_datagram_endpoint_setblk_err(self, m_socket):
m_socket.socket.return_value.setblocking.side_effect = OSError
@@ -865,14 +865,14 @@ class BaseEventLoopWithSelectorTests(unittest.TestCase):
asyncio.DatagramProtocol)
self.assertRaises(ValueError, self.loop.run_until_complete, coro)
- @unittest.mock.patch('asyncio.base_events.socket')
+ @mock.patch('asyncio.base_events.socket')
def test_create_datagram_endpoint_cant_bind(self, m_socket):
class Err(OSError):
pass
m_socket.AF_INET6 = socket.AF_INET6
m_socket.getaddrinfo = socket.getaddrinfo
- m_sock = m_socket.socket.return_value = unittest.mock.Mock()
+ m_sock = m_socket.socket.return_value = mock.Mock()
m_sock.bind.side_effect = Err
fut = self.loop.create_datagram_endpoint(
@@ -882,19 +882,19 @@ class BaseEventLoopWithSelectorTests(unittest.TestCase):
self.assertTrue(m_sock.close.called)
def test_accept_connection_retry(self):
- sock = unittest.mock.Mock()
+ sock = mock.Mock()
sock.accept.side_effect = BlockingIOError()
self.loop._accept_connection(MyProto, sock)
self.assertFalse(sock.close.called)
- @unittest.mock.patch('asyncio.base_events.logger')
+ @mock.patch('asyncio.base_events.logger')
def test_accept_connection_exception(self, m_log):
- sock = unittest.mock.Mock()
+ sock = mock.Mock()
sock.fileno.return_value = 10
sock.accept.side_effect = OSError(errno.EMFILE, 'Too many open files')
- self.loop.remove_reader = unittest.mock.Mock()
- self.loop.call_later = unittest.mock.Mock()
+ self.loop.remove_reader = mock.Mock()
+ self.loop.call_later = mock.Mock()
self.loop._accept_connection(MyProto, sock)
self.assertTrue(m_log.error.called)
@@ -902,7 +902,7 @@ class BaseEventLoopWithSelectorTests(unittest.TestCase):
self.loop.remove_reader.assert_called_with(10)
self.loop.call_later.assert_called_with(constants.ACCEPT_RETRY_DELAY,
# self.loop._start_serving
- unittest.mock.ANY,
+ mock.ANY,
MyProto, sock, None, None)
def test_call_coroutine(self):
diff --git a/Lib/test/test_asyncio/test_events.py b/Lib/test/test_asyncio/test_events.py
index d00af23..bafa875 100644
--- a/Lib/test/test_asyncio/test_events.py
+++ b/Lib/test/test_asyncio/test_events.py
@@ -20,12 +20,11 @@ import threading
import time
import errno
import unittest
-import unittest.mock
+from unittest import mock
from test import support # find_unused_port, IPV6_ENABLED, TEST_HOME_DIR
import asyncio
-from asyncio import events
from asyncio import selector_events
from asyncio import test_utils
@@ -57,6 +56,7 @@ SIGNING_CA = data_file('pycacert.pem')
class MyBaseProto(asyncio.Protocol):
+ connected = None
done = None
def __init__(self, loop=None):
@@ -64,12 +64,15 @@ class MyBaseProto(asyncio.Protocol):
self.state = 'INITIAL'
self.nbytes = 0
if loop is not None:
+ self.connected = asyncio.Future(loop=loop)
self.done = asyncio.Future(loop=loop)
def connection_made(self, transport):
self.transport = transport
assert self.state == 'INITIAL', self.state
self.state = 'CONNECTED'
+ if self.connected:
+ self.connected.set_result(None)
def data_received(self, data):
assert self.state == 'CONNECTED', self.state
@@ -331,7 +334,8 @@ class EventLoopTestsMixin:
def test_reader_callback(self):
r, w = test_utils.socketpair()
- bytes_read = []
+ r.setblocking(False)
+ bytes_read = bytearray()
def reader():
try:
@@ -341,37 +345,40 @@ class EventLoopTestsMixin:
# at least on Linux -- see man select.
return
if data:
- bytes_read.append(data)
+ bytes_read.extend(data)
else:
self.assertTrue(self.loop.remove_reader(r.fileno()))
r.close()
self.loop.add_reader(r.fileno(), reader)
self.loop.call_soon(w.send, b'abc')
- test_utils.run_briefly(self.loop)
+ test_utils.run_until(self.loop, lambda: len(bytes_read) >= 3)
self.loop.call_soon(w.send, b'def')
- test_utils.run_briefly(self.loop)
+ test_utils.run_until(self.loop, lambda: len(bytes_read) >= 6)
self.loop.call_soon(w.close)
self.loop.call_soon(self.loop.stop)
self.loop.run_forever()
- self.assertEqual(b''.join(bytes_read), b'abcdef')
+ self.assertEqual(bytes_read, b'abcdef')
def test_writer_callback(self):
r, w = test_utils.socketpair()
w.setblocking(False)
- self.loop.add_writer(w.fileno(), w.send, b'x'*(256*1024))
- test_utils.run_briefly(self.loop)
- def remove_writer():
- self.assertTrue(self.loop.remove_writer(w.fileno()))
+ def writer(data):
+ w.send(data)
+ self.loop.stop()
- self.loop.call_soon(remove_writer)
- self.loop.call_soon(self.loop.stop)
+ data = b'x' * 1024
+ self.loop.add_writer(w.fileno(), writer, data)
self.loop.run_forever()
+
+ self.assertTrue(self.loop.remove_writer(w.fileno()))
+ self.assertFalse(self.loop.remove_writer(w.fileno()))
+
w.close()
- data = r.recv(256*1024)
+ read = r.recv(len(data) * 2)
r.close()
- self.assertGreaterEqual(len(data), 200)
+ self.assertEqual(read, data)
def _basetest_sock_client_ops(self, httpd, sock):
sock.setblocking(False)
@@ -465,10 +472,10 @@ class EventLoopTestsMixin:
self.assertFalse(self.loop.remove_signal_handler(signal.SIGKILL))
# Now set a handler and handle it.
self.loop.add_signal_handler(signal.SIGINT, my_handler)
- test_utils.run_briefly(self.loop)
+
os.kill(os.getpid(), signal.SIGINT)
- test_utils.run_briefly(self.loop)
- self.assertEqual(caught, 1)
+ test_utils.run_until(self.loop, lambda: caught)
+
# Removing it should restore the default handler.
self.assertTrue(self.loop.remove_signal_handler(signal.SIGINT))
self.assertEqual(signal.getsignal(signal.SIGINT),
@@ -624,7 +631,7 @@ class EventLoopTestsMixin:
self.assertIn(str(httpd.address), cm.exception.strerror)
def test_create_server(self):
- proto = MyProto()
+ proto = MyProto(self.loop)
f = self.loop.create_server(lambda: proto, '0.0.0.0', 0)
server = self.loop.run_until_complete(f)
self.assertEqual(len(server.sockets), 1)
@@ -634,14 +641,11 @@ class EventLoopTestsMixin:
client = socket.socket()
client.connect(('127.0.0.1', port))
client.sendall(b'xxx')
- test_utils.run_briefly(self.loop)
- test_utils.run_until(self.loop, lambda: proto is not None, 10)
- self.assertIsInstance(proto, MyProto)
- self.assertEqual('INITIAL', proto.state)
- test_utils.run_briefly(self.loop)
+
+ self.loop.run_until_complete(proto.connected)
self.assertEqual('CONNECTED', proto.state)
- test_utils.run_until(self.loop, lambda: proto.nbytes > 0,
- timeout=10)
+
+ test_utils.run_until(self.loop, lambda: proto.nbytes > 0)
self.assertEqual(3, proto.nbytes)
# extra info is available
@@ -651,7 +655,7 @@ class EventLoopTestsMixin:
# close connection
proto.transport.close()
- test_utils.run_briefly(self.loop) # windows iocp
+ self.loop.run_until_complete(proto.done)
self.assertEqual('CLOSED', proto.state)
@@ -673,27 +677,22 @@ class EventLoopTestsMixin:
@unittest.skipUnless(hasattr(socket, 'AF_UNIX'), 'No UNIX Sockets')
def test_create_unix_server(self):
- proto = MyProto()
+ proto = MyProto(loop=self.loop)
server, path = self._make_unix_server(lambda: proto)
self.assertEqual(len(server.sockets), 1)
client = socket.socket(socket.AF_UNIX)
client.connect(path)
client.sendall(b'xxx')
- test_utils.run_briefly(self.loop)
- test_utils.run_until(self.loop, lambda: proto is not None, 10)
- self.assertIsInstance(proto, MyProto)
- self.assertEqual('INITIAL', proto.state)
- test_utils.run_briefly(self.loop)
+ self.loop.run_until_complete(proto.connected)
self.assertEqual('CONNECTED', proto.state)
- test_utils.run_until(self.loop, lambda: proto.nbytes > 0,
- timeout=10)
+ test_utils.run_until(self.loop, lambda: proto.nbytes > 0)
self.assertEqual(3, proto.nbytes)
# close connection
proto.transport.close()
- test_utils.run_briefly(self.loop) # windows iocp
+ self.loop.run_until_complete(proto.done)
self.assertEqual('CLOSED', proto.state)
@@ -736,12 +735,10 @@ class EventLoopTestsMixin:
client, pr = self.loop.run_until_complete(f_c)
client.write(b'xxx')
- test_utils.run_briefly(self.loop)
- self.assertIsInstance(proto, MyProto)
- test_utils.run_briefly(self.loop)
+ self.loop.run_until_complete(proto.connected)
self.assertEqual('CONNECTED', proto.state)
- test_utils.run_until(self.loop, lambda: proto.nbytes > 0,
- timeout=10)
+
+ test_utils.run_until(self.loop, lambda: proto.nbytes > 0)
self.assertEqual(3, proto.nbytes)
# extra info is available
@@ -775,12 +772,9 @@ class EventLoopTestsMixin:
client, pr = self.loop.run_until_complete(f_c)
client.write(b'xxx')
- test_utils.run_briefly(self.loop)
- self.assertIsInstance(proto, MyProto)
- test_utils.run_briefly(self.loop)
+ self.loop.run_until_complete(proto.connected)
self.assertEqual('CONNECTED', proto.state)
- test_utils.run_until(self.loop, lambda: proto.nbytes > 0,
- timeout=10)
+ test_utils.run_until(self.loop, lambda: proto.nbytes > 0)
self.assertEqual(3, proto.nbytes)
# close connection
@@ -1045,15 +1039,9 @@ class EventLoopTestsMixin:
self.assertEqual('INITIALIZED', client.state)
transport.sendto(b'xxx')
- for _ in range(1000):
- if server.nbytes:
- break
- test_utils.run_briefly(self.loop)
+ test_utils.run_until(self.loop, lambda: server.nbytes)
self.assertEqual(3, server.nbytes)
- for _ in range(1000):
- if client.nbytes:
- break
- test_utils.run_briefly(self.loop)
+ test_utils.run_until(self.loop, lambda: client.nbytes)
# received
self.assertEqual(8, client.nbytes)
@@ -1098,11 +1086,11 @@ class EventLoopTestsMixin:
self.loop.run_until_complete(connect())
os.write(wpipe, b'1')
- test_utils.run_briefly(self.loop)
+ test_utils.run_until(self.loop, lambda: proto.nbytes >= 1)
self.assertEqual(1, proto.nbytes)
os.write(wpipe, b'2345')
- test_utils.run_briefly(self.loop)
+ test_utils.run_until(self.loop, lambda: proto.nbytes >= 5)
self.assertEqual(['INITIAL', 'CONNECTED'], proto.state)
self.assertEqual(5, proto.nbytes)
@@ -1156,33 +1144,30 @@ class EventLoopTestsMixin:
@unittest.skipUnless(sys.platform != 'win32',
"Don't support pipes for Windows")
def test_write_pipe(self):
- proto = MyWritePipeProto(loop=self.loop)
- transport = None
-
rpipe, wpipe = os.pipe()
pipeobj = io.open(wpipe, 'wb', 1024)
- @asyncio.coroutine
- def connect():
- nonlocal transport
- t, p = yield from self.loop.connect_write_pipe(
- lambda: proto, pipeobj)
- self.assertIs(p, proto)
- self.assertIs(t, proto.transport)
- self.assertEqual('CONNECTED', proto.state)
- transport = t
-
- self.loop.run_until_complete(connect())
+ proto = MyWritePipeProto(loop=self.loop)
+ connect = self.loop.connect_write_pipe(lambda: proto, pipeobj)
+ transport, p = self.loop.run_until_complete(connect)
+ self.assertIs(p, proto)
+ self.assertIs(transport, proto.transport)
+ self.assertEqual('CONNECTED', proto.state)
transport.write(b'1')
- test_utils.run_briefly(self.loop)
- data = os.read(rpipe, 1024)
+
+ data = bytearray()
+ def reader(data):
+ chunk = os.read(rpipe, 1024)
+ data += chunk
+ return len(data)
+
+ test_utils.run_until(self.loop, lambda: reader(data) >= 1)
self.assertEqual(b'1', data)
transport.write(b'2345')
- test_utils.run_briefly(self.loop)
- data = os.read(rpipe, 1024)
- self.assertEqual(b'2345', data)
+ test_utils.run_until(self.loop, lambda: reader(data) >= 5)
+ self.assertEqual(b'12345', data)
self.assertEqual('CONNECTED', proto.state)
os.close(rpipe)
@@ -1198,23 +1183,14 @@ class EventLoopTestsMixin:
@unittest.skipUnless(sys.platform != 'win32',
"Don't support pipes for Windows")
def test_write_pipe_disconnect_on_close(self):
- proto = MyWritePipeProto(loop=self.loop)
- transport = None
-
rsock, wsock = test_utils.socketpair()
pipeobj = io.open(wsock.detach(), 'wb', 1024)
- @asyncio.coroutine
- def connect():
- nonlocal transport
- t, p = yield from self.loop.connect_write_pipe(lambda: proto,
- pipeobj)
- self.assertIs(p, proto)
- self.assertIs(t, proto.transport)
- self.assertEqual('CONNECTED', proto.state)
- transport = t
-
- self.loop.run_until_complete(connect())
+ proto = MyWritePipeProto(loop=self.loop)
+ connect = self.loop.connect_write_pipe(lambda: proto, pipeobj)
+ transport, p = self.loop.run_until_complete(connect)
+ self.assertIs(p, proto)
+ self.assertIs(transport, proto.transport)
self.assertEqual('CONNECTED', proto.state)
transport.write(b'1')
@@ -1232,33 +1208,32 @@ class EventLoopTestsMixin:
# older than 10.6 (Snow Leopard)
@support.requires_mac_ver(10, 6)
def test_write_pty(self):
- proto = MyWritePipeProto(loop=self.loop)
- transport = None
-
master, slave = os.openpty()
slave_write_obj = io.open(slave, 'wb', 0)
- @asyncio.coroutine
- def connect():
- nonlocal transport
- t, p = yield from self.loop.connect_write_pipe(lambda: proto,
- slave_write_obj)
- self.assertIs(p, proto)
- self.assertIs(t, proto.transport)
- self.assertEqual('CONNECTED', proto.state)
- transport = t
-
- self.loop.run_until_complete(connect())
+ proto = MyWritePipeProto(loop=self.loop)
+ connect = self.loop.connect_write_pipe(lambda: proto, slave_write_obj)
+ transport, p = self.loop.run_until_complete(connect)
+ self.assertIs(p, proto)
+ self.assertIs(transport, proto.transport)
+ self.assertEqual('CONNECTED', proto.state)
transport.write(b'1')
- test_utils.run_briefly(self.loop)
- data = os.read(master, 1024)
+
+ data = bytearray()
+ def reader(data):
+ chunk = os.read(master, 1024)
+ data += chunk
+ return len(data)
+
+ test_utils.run_until(self.loop, lambda: reader(data) >= 1,
+ timeout=10)
self.assertEqual(b'1', data)
transport.write(b'2345')
- test_utils.run_briefly(self.loop)
- data = os.read(master, 1024)
- self.assertEqual(b'2345', data)
+ test_utils.run_until(self.loop, lambda: reader(data) >= 5,
+ timeout=10)
+ self.assertEqual(b'12345', data)
self.assertEqual('CONNECTED', proto.state)
os.close(master)
@@ -1352,6 +1327,30 @@ class EventLoopTestsMixin:
self.assertIn('address must be resolved',
str(cm.exception))
+ def test_remove_fds_after_closing(self):
+ loop = self.create_event_loop()
+ callback = lambda: None
+ r, w = test_utils.socketpair()
+ self.addCleanup(r.close)
+ self.addCleanup(w.close)
+ loop.add_reader(r, callback)
+ loop.add_writer(w, callback)
+ loop.close()
+ self.assertFalse(loop.remove_reader(r))
+ self.assertFalse(loop.remove_writer(w))
+
+ def test_add_fds_after_closing(self):
+ loop = self.create_event_loop()
+ callback = lambda: None
+ r, w = test_utils.socketpair()
+ self.addCleanup(r.close)
+ self.addCleanup(w.close)
+ loop.close()
+ with self.assertRaises(RuntimeError):
+ loop.add_reader(r, callback)
+ with self.assertRaises(RuntimeError):
+ loop.add_writer(w, callback)
+
class SubprocessTestsMixin:
@@ -1370,20 +1369,13 @@ class SubprocessTestsMixin:
self.assertEqual(-signal.SIGKILL, returncode)
def test_subprocess_exec(self):
- proto = None
- transp = None
-
prog = os.path.join(os.path.dirname(__file__), 'echo.py')
- @asyncio.coroutine
- def connect():
- nonlocal proto, transp
- transp, proto = yield from self.loop.subprocess_exec(
- functools.partial(MySubprocessProtocol, self.loop),
- sys.executable, prog)
- self.assertIsInstance(proto, MySubprocessProtocol)
-
- self.loop.run_until_complete(connect())
+ connect = self.loop.subprocess_exec(
+ functools.partial(MySubprocessProtocol, self.loop),
+ sys.executable, prog)
+ transp, proto = self.loop.run_until_complete(connect)
+ self.assertIsInstance(proto, MySubprocessProtocol)
self.loop.run_until_complete(proto.connected)
self.assertEqual('CONNECTED', proto.state)
@@ -1396,20 +1388,13 @@ class SubprocessTestsMixin:
self.assertEqual(b'Python The Winner', proto.data[1])
def test_subprocess_interactive(self):
- proto = None
- transp = None
-
prog = os.path.join(os.path.dirname(__file__), 'echo.py')
- @asyncio.coroutine
- def connect():
- nonlocal proto, transp
- transp, proto = yield from self.loop.subprocess_exec(
- functools.partial(MySubprocessProtocol, self.loop),
- sys.executable, prog)
- self.assertIsInstance(proto, MySubprocessProtocol)
-
- self.loop.run_until_complete(connect())
+ connect = self.loop.subprocess_exec(
+ functools.partial(MySubprocessProtocol, self.loop),
+ sys.executable, prog)
+ transp, proto = self.loop.run_until_complete(connect)
+ self.assertIsInstance(proto, MySubprocessProtocol)
self.loop.run_until_complete(proto.connected)
self.assertEqual('CONNECTED', proto.state)
@@ -1430,18 +1415,11 @@ class SubprocessTestsMixin:
self.check_terminated(proto.returncode)
def test_subprocess_shell(self):
- proto = None
- transp = None
-
- @asyncio.coroutine
- def connect():
- nonlocal proto, transp
- transp, proto = yield from self.loop.subprocess_shell(
- functools.partial(MySubprocessProtocol, self.loop),
- 'echo Python')
- self.assertIsInstance(proto, MySubprocessProtocol)
-
- self.loop.run_until_complete(connect())
+ connect = self.loop.subprocess_shell(
+ functools.partial(MySubprocessProtocol, self.loop),
+ 'echo Python')
+ transp, proto = self.loop.run_until_complete(connect)
+ self.assertIsInstance(proto, MySubprocessProtocol)
self.loop.run_until_complete(proto.connected)
transp.get_pipe_transport(0).close()
@@ -1452,33 +1430,20 @@ class SubprocessTestsMixin:
self.assertEqual(proto.data[2], b'')
def test_subprocess_exitcode(self):
- proto = None
-
- @asyncio.coroutine
- def connect():
- nonlocal proto
- transp, proto = yield from self.loop.subprocess_shell(
- functools.partial(MySubprocessProtocol, self.loop),
- 'exit 7', stdin=None, stdout=None, stderr=None)
- self.assertIsInstance(proto, MySubprocessProtocol)
-
- self.loop.run_until_complete(connect())
+ connect = self.loop.subprocess_shell(
+ functools.partial(MySubprocessProtocol, self.loop),
+ 'exit 7', stdin=None, stdout=None, stderr=None)
+ transp, proto = self.loop.run_until_complete(connect)
+ self.assertIsInstance(proto, MySubprocessProtocol)
self.loop.run_until_complete(proto.completed)
self.assertEqual(7, proto.returncode)
def test_subprocess_close_after_finish(self):
- proto = None
- transp = None
-
- @asyncio.coroutine
- def connect():
- nonlocal proto, transp
- transp, proto = yield from self.loop.subprocess_shell(
- functools.partial(MySubprocessProtocol, self.loop),
- 'exit 7', stdin=None, stdout=None, stderr=None)
- self.assertIsInstance(proto, MySubprocessProtocol)
-
- self.loop.run_until_complete(connect())
+ connect = self.loop.subprocess_shell(
+ functools.partial(MySubprocessProtocol, self.loop),
+ 'exit 7', stdin=None, stdout=None, stderr=None)
+ transp, proto = self.loop.run_until_complete(connect)
+ self.assertIsInstance(proto, MySubprocessProtocol)
self.assertIsNone(transp.get_pipe_transport(0))
self.assertIsNone(transp.get_pipe_transport(1))
self.assertIsNone(transp.get_pipe_transport(2))
@@ -1487,20 +1452,13 @@ class SubprocessTestsMixin:
self.assertIsNone(transp.close())
def test_subprocess_kill(self):
- proto = None
- transp = None
-
prog = os.path.join(os.path.dirname(__file__), 'echo.py')
- @asyncio.coroutine
- def connect():
- nonlocal proto, transp
- transp, proto = yield from self.loop.subprocess_exec(
- functools.partial(MySubprocessProtocol, self.loop),
- sys.executable, prog)
- self.assertIsInstance(proto, MySubprocessProtocol)
-
- self.loop.run_until_complete(connect())
+ connect = self.loop.subprocess_exec(
+ functools.partial(MySubprocessProtocol, self.loop),
+ sys.executable, prog)
+ transp, proto = self.loop.run_until_complete(connect)
+ self.assertIsInstance(proto, MySubprocessProtocol)
self.loop.run_until_complete(proto.connected)
transp.kill()
@@ -1508,20 +1466,13 @@ class SubprocessTestsMixin:
self.check_killed(proto.returncode)
def test_subprocess_terminate(self):
- proto = None
- transp = None
-
prog = os.path.join(os.path.dirname(__file__), 'echo.py')
- @asyncio.coroutine
- def connect():
- nonlocal proto, transp
- transp, proto = yield from self.loop.subprocess_exec(
- functools.partial(MySubprocessProtocol, self.loop),
- sys.executable, prog)
- self.assertIsInstance(proto, MySubprocessProtocol)
-
- self.loop.run_until_complete(connect())
+ connect = self.loop.subprocess_exec(
+ functools.partial(MySubprocessProtocol, self.loop),
+ sys.executable, prog)
+ transp, proto = self.loop.run_until_complete(connect)
+ self.assertIsInstance(proto, MySubprocessProtocol)
self.loop.run_until_complete(proto.connected)
transp.terminate()
@@ -1530,20 +1481,13 @@ class SubprocessTestsMixin:
@unittest.skipIf(sys.platform == 'win32', "Don't have SIGHUP")
def test_subprocess_send_signal(self):
- proto = None
- transp = None
-
prog = os.path.join(os.path.dirname(__file__), 'echo.py')
- @asyncio.coroutine
- def connect():
- nonlocal proto, transp
- transp, proto = yield from self.loop.subprocess_exec(
- functools.partial(MySubprocessProtocol, self.loop),
- sys.executable, prog)
- self.assertIsInstance(proto, MySubprocessProtocol)
-
- self.loop.run_until_complete(connect())
+ connect = self.loop.subprocess_exec(
+ functools.partial(MySubprocessProtocol, self.loop),
+ sys.executable, prog)
+ transp, proto = self.loop.run_until_complete(connect)
+ self.assertIsInstance(proto, MySubprocessProtocol)
self.loop.run_until_complete(proto.connected)
transp.send_signal(signal.SIGHUP)
@@ -1551,20 +1495,13 @@ class SubprocessTestsMixin:
self.assertEqual(-signal.SIGHUP, proto.returncode)
def test_subprocess_stderr(self):
- proto = None
- transp = None
-
prog = os.path.join(os.path.dirname(__file__), 'echo2.py')
- @asyncio.coroutine
- def connect():
- nonlocal proto, transp
- transp, proto = yield from self.loop.subprocess_exec(
- functools.partial(MySubprocessProtocol, self.loop),
- sys.executable, prog)
- self.assertIsInstance(proto, MySubprocessProtocol)
-
- self.loop.run_until_complete(connect())
+ connect = self.loop.subprocess_exec(
+ functools.partial(MySubprocessProtocol, self.loop),
+ sys.executable, prog)
+ transp, proto = self.loop.run_until_complete(connect)
+ self.assertIsInstance(proto, MySubprocessProtocol)
self.loop.run_until_complete(proto.connected)
stdin = transp.get_pipe_transport(0)
@@ -1578,20 +1515,13 @@ class SubprocessTestsMixin:
self.assertEqual(0, proto.returncode)
def test_subprocess_stderr_redirect_to_stdout(self):
- proto = None
- transp = None
-
prog = os.path.join(os.path.dirname(__file__), 'echo2.py')
- @asyncio.coroutine
- def connect():
- nonlocal proto, transp
- transp, proto = yield from self.loop.subprocess_exec(
- functools.partial(MySubprocessProtocol, self.loop),
- sys.executable, prog, stderr=subprocess.STDOUT)
- self.assertIsInstance(proto, MySubprocessProtocol)
-
- self.loop.run_until_complete(connect())
+ connect = self.loop.subprocess_exec(
+ functools.partial(MySubprocessProtocol, self.loop),
+ sys.executable, prog, stderr=subprocess.STDOUT)
+ transp, proto = self.loop.run_until_complete(connect)
+ self.assertIsInstance(proto, MySubprocessProtocol)
self.loop.run_until_complete(proto.connected)
stdin = transp.get_pipe_transport(0)
@@ -1608,20 +1538,13 @@ class SubprocessTestsMixin:
self.assertEqual(0, proto.returncode)
def test_subprocess_close_client_stream(self):
- proto = None
- transp = None
-
prog = os.path.join(os.path.dirname(__file__), 'echo3.py')
- @asyncio.coroutine
- def connect():
- nonlocal proto, transp
- transp, proto = yield from self.loop.subprocess_exec(
- functools.partial(MySubprocessProtocol, self.loop),
- sys.executable, prog)
- self.assertIsInstance(proto, MySubprocessProtocol)
-
- self.loop.run_until_complete(connect())
+ connect = self.loop.subprocess_exec(
+ functools.partial(MySubprocessProtocol, self.loop),
+ sys.executable, prog)
+ transp, proto = self.loop.run_until_complete(connect)
+ self.assertIsInstance(proto, MySubprocessProtocol)
self.loop.run_until_complete(proto.connected)
stdin = transp.get_pipe_transport(0)
@@ -1647,20 +1570,13 @@ class SubprocessTestsMixin:
self.check_terminated(proto.returncode)
def test_subprocess_wait_no_same_group(self):
- proto = None
- transp = None
-
- @asyncio.coroutine
- def connect():
- nonlocal proto
- # start the new process in a new session
- transp, proto = yield from self.loop.subprocess_shell(
- functools.partial(MySubprocessProtocol, self.loop),
- 'exit 7', stdin=None, stdout=None, stderr=None,
- start_new_session=True)
- self.assertIsInstance(proto, MySubprocessProtocol)
-
- self.loop.run_until_complete(connect())
+ # start the new process in a new session
+ connect = self.loop.subprocess_shell(
+ functools.partial(MySubprocessProtocol, self.loop),
+ 'exit 7', stdin=None, stdout=None, stderr=None,
+ start_new_session=True)
+ _, proto = yield self.loop.run_until_complete(connect)
+ self.assertIsInstance(proto, MySubprocessProtocol)
self.loop.run_until_complete(proto.completed)
self.assertEqual(7, proto.returncode)
@@ -1741,6 +1657,9 @@ if sys.platform == 'win32':
def test_create_datagram_endpoint(self):
raise unittest.SkipTest(
"IocpEventLoop does not have create_datagram_endpoint()")
+
+ def test_remove_fds_after_closing(self):
+ raise unittest.SkipTest("IocpEventLoop does not have add_reader()")
else:
from asyncio import selectors
@@ -1812,7 +1731,7 @@ class HandleTests(unittest.TestCase):
return args
args = ()
- h = asyncio.Handle(callback, args, unittest.mock.Mock())
+ h = asyncio.Handle(callback, args, mock.Mock())
self.assertIs(h._callback, callback)
self.assertIs(h._args, args)
self.assertFalse(h._cancelled)
@@ -1844,15 +1763,15 @@ class HandleTests(unittest.TestCase):
def callback():
raise ValueError()
- m_loop = unittest.mock.Mock()
- m_loop.call_exception_handler = unittest.mock.Mock()
+ m_loop = mock.Mock()
+ m_loop.call_exception_handler = mock.Mock()
h = asyncio.Handle(callback, (), m_loop)
h._run()
m_loop.call_exception_handler.assert_called_with({
'message': test_utils.MockPattern('Exception in callback.*'),
- 'exception': unittest.mock.ANY,
+ 'exception': mock.ANY,
'handle': h
})
@@ -1862,7 +1781,7 @@ class TimerTests(unittest.TestCase):
def test_hash(self):
when = time.monotonic()
h = asyncio.TimerHandle(when, lambda: False, (),
- unittest.mock.Mock())
+ mock.Mock())
self.assertEqual(hash(h), hash(when))
def test_timer(self):
@@ -1871,7 +1790,7 @@ class TimerTests(unittest.TestCase):
args = ()
when = time.monotonic()
- h = asyncio.TimerHandle(when, callback, args, unittest.mock.Mock())
+ h = asyncio.TimerHandle(when, callback, args, mock.Mock())
self.assertIs(h._callback, callback)
self.assertIs(h._args, args)
self.assertFalse(h._cancelled)
@@ -1887,10 +1806,10 @@ class TimerTests(unittest.TestCase):
self.assertRaises(AssertionError,
asyncio.TimerHandle, None, callback, args,
- unittest.mock.Mock())
+ mock.Mock())
def test_timer_comparison(self):
- loop = unittest.mock.Mock()
+ loop = mock.Mock()
def callback(*args):
return args
@@ -1935,7 +1854,7 @@ class TimerTests(unittest.TestCase):
class AbstractEventLoopTests(unittest.TestCase):
def test_not_implemented(self):
- f = unittest.mock.Mock()
+ f = mock.Mock()
loop = asyncio.AbstractEventLoop()
self.assertRaises(
NotImplementedError, loop.run_forever)
@@ -1995,13 +1914,13 @@ class AbstractEventLoopTests(unittest.TestCase):
NotImplementedError, loop.remove_signal_handler, 1)
self.assertRaises(
NotImplementedError, loop.connect_read_pipe, f,
- unittest.mock.sentinel.pipe)
+ mock.sentinel.pipe)
self.assertRaises(
NotImplementedError, loop.connect_write_pipe, f,
- unittest.mock.sentinel.pipe)
+ mock.sentinel.pipe)
self.assertRaises(
NotImplementedError, loop.subprocess_shell, f,
- unittest.mock.sentinel)
+ mock.sentinel)
self.assertRaises(
NotImplementedError, loop.subprocess_exec, f)
@@ -2009,7 +1928,7 @@ class AbstractEventLoopTests(unittest.TestCase):
class ProtocolsAbsTests(unittest.TestCase):
def test_empty(self):
- f = unittest.mock.Mock()
+ f = mock.Mock()
p = asyncio.Protocol()
self.assertIsNone(p.connection_made(f))
self.assertIsNone(p.connection_lost(f))
@@ -2055,7 +1974,7 @@ class PolicyTests(unittest.TestCase):
def test_get_event_loop_calls_set_event_loop(self):
policy = asyncio.DefaultEventLoopPolicy()
- with unittest.mock.patch.object(
+ with mock.patch.object(
policy, "set_event_loop",
wraps=policy.set_event_loop) as m_set_event_loop:
@@ -2073,7 +1992,7 @@ class PolicyTests(unittest.TestCase):
policy.set_event_loop(None)
self.assertRaises(AssertionError, policy.get_event_loop)
- @unittest.mock.patch('asyncio.events.threading.current_thread')
+ @mock.patch('asyncio.events.threading.current_thread')
def test_get_event_loop_thread(self, m_current_thread):
def f():
diff --git a/Lib/test/test_asyncio/test_futures.py b/Lib/test/test_asyncio/test_futures.py
index f2b81dd..399e8f4 100644
--- a/Lib/test/test_asyncio/test_futures.py
+++ b/Lib/test/test_asyncio/test_futures.py
@@ -3,7 +3,7 @@
import concurrent.futures
import threading
import unittest
-import unittest.mock
+from unittest import mock
import asyncio
from asyncio import test_utils
@@ -174,20 +174,20 @@ class FutureTests(unittest.TestCase):
self.assertRaises(AssertionError, test)
fut.cancel()
- @unittest.mock.patch('asyncio.base_events.logger')
+ @mock.patch('asyncio.base_events.logger')
def test_tb_logger_abandoned(self, m_log):
fut = asyncio.Future(loop=self.loop)
del fut
self.assertFalse(m_log.error.called)
- @unittest.mock.patch('asyncio.base_events.logger')
+ @mock.patch('asyncio.base_events.logger')
def test_tb_logger_result_unretrieved(self, m_log):
fut = asyncio.Future(loop=self.loop)
fut.set_result(42)
del fut
self.assertFalse(m_log.error.called)
- @unittest.mock.patch('asyncio.base_events.logger')
+ @mock.patch('asyncio.base_events.logger')
def test_tb_logger_result_retrieved(self, m_log):
fut = asyncio.Future(loop=self.loop)
fut.set_result(42)
@@ -195,7 +195,7 @@ class FutureTests(unittest.TestCase):
del fut
self.assertFalse(m_log.error.called)
- @unittest.mock.patch('asyncio.base_events.logger')
+ @mock.patch('asyncio.base_events.logger')
def test_tb_logger_exception_unretrieved(self, m_log):
fut = asyncio.Future(loop=self.loop)
fut.set_exception(RuntimeError('boom'))
@@ -203,7 +203,7 @@ class FutureTests(unittest.TestCase):
test_utils.run_briefly(self.loop)
self.assertTrue(m_log.error.called)
- @unittest.mock.patch('asyncio.base_events.logger')
+ @mock.patch('asyncio.base_events.logger')
def test_tb_logger_exception_retrieved(self, m_log):
fut = asyncio.Future(loop=self.loop)
fut.set_exception(RuntimeError('boom'))
@@ -211,7 +211,7 @@ class FutureTests(unittest.TestCase):
del fut
self.assertFalse(m_log.error.called)
- @unittest.mock.patch('asyncio.base_events.logger')
+ @mock.patch('asyncio.base_events.logger')
def test_tb_logger_exception_result_retrieved(self, m_log):
fut = asyncio.Future(loop=self.loop)
fut.set_exception(RuntimeError('boom'))
@@ -236,7 +236,7 @@ class FutureTests(unittest.TestCase):
f2 = asyncio.wrap_future(f1)
self.assertIs(f1, f2)
- @unittest.mock.patch('asyncio.futures.events')
+ @mock.patch('asyncio.futures.events')
def test_wrap_future_use_global_loop(self, m_events):
def run(arg):
return (arg, threading.get_ident())
diff --git a/Lib/test/test_asyncio/test_locks.py b/Lib/test/test_asyncio/test_locks.py
index 0975f49..f542463 100644
--- a/Lib/test/test_asyncio/test_locks.py
+++ b/Lib/test/test_asyncio/test_locks.py
@@ -1,7 +1,7 @@
"""Tests for lock.py"""
import unittest
-import unittest.mock
+from unittest import mock
import re
import asyncio
@@ -27,7 +27,7 @@ class LockTests(unittest.TestCase):
self.loop.close()
def test_ctor_loop(self):
- loop = unittest.mock.Mock()
+ loop = mock.Mock()
lock = asyncio.Lock(loop=loop)
self.assertIs(lock._loop, loop)
@@ -250,7 +250,7 @@ class EventTests(unittest.TestCase):
self.loop.close()
def test_ctor_loop(self):
- loop = unittest.mock.Mock()
+ loop = mock.Mock()
ev = asyncio.Event(loop=loop)
self.assertIs(ev._loop, loop)
@@ -275,7 +275,7 @@ class EventTests(unittest.TestCase):
self.assertTrue(repr(ev).endswith('[set]>'))
self.assertTrue(RGX_REPR.match(repr(ev)))
- ev._waiters.append(unittest.mock.Mock())
+ ev._waiters.append(mock.Mock())
self.assertTrue('waiters:1' in repr(ev))
self.assertTrue(RGX_REPR.match(repr(ev)))
@@ -386,7 +386,7 @@ class ConditionTests(unittest.TestCase):
self.loop.close()
def test_ctor_loop(self):
- loop = unittest.mock.Mock()
+ loop = mock.Mock()
cond = asyncio.Condition(loop=loop)
self.assertIs(cond._loop, loop)
@@ -644,11 +644,11 @@ class ConditionTests(unittest.TestCase):
self.loop.run_until_complete(cond.acquire())
self.assertTrue('locked' in repr(cond))
- cond._waiters.append(unittest.mock.Mock())
+ cond._waiters.append(mock.Mock())
self.assertTrue('waiters:1' in repr(cond))
self.assertTrue(RGX_REPR.match(repr(cond)))
- cond._waiters.append(unittest.mock.Mock())
+ cond._waiters.append(mock.Mock())
self.assertTrue('waiters:2' in repr(cond))
self.assertTrue(RGX_REPR.match(repr(cond)))
@@ -688,7 +688,7 @@ class SemaphoreTests(unittest.TestCase):
self.loop.close()
def test_ctor_loop(self):
- loop = unittest.mock.Mock()
+ loop = mock.Mock()
sem = asyncio.Semaphore(loop=loop)
self.assertIs(sem._loop, loop)
@@ -717,11 +717,11 @@ class SemaphoreTests(unittest.TestCase):
self.assertTrue('waiters' not in repr(sem))
self.assertTrue(RGX_REPR.match(repr(sem)))
- sem._waiters.append(unittest.mock.Mock())
+ sem._waiters.append(mock.Mock())
self.assertTrue('waiters:1' in repr(sem))
self.assertTrue(RGX_REPR.match(repr(sem)))
- sem._waiters.append(unittest.mock.Mock())
+ sem._waiters.append(mock.Mock())
self.assertTrue('waiters:2' in repr(sem))
self.assertTrue(RGX_REPR.match(repr(sem)))
diff --git a/Lib/test/test_asyncio/test_proactor_events.py b/Lib/test/test_asyncio/test_proactor_events.py
index 0892069..5bf24a4 100644
--- a/Lib/test/test_asyncio/test_proactor_events.py
+++ b/Lib/test/test_asyncio/test_proactor_events.py
@@ -2,7 +2,7 @@
import socket
import unittest
-import unittest.mock
+from unittest import mock
import asyncio
from asyncio.proactor_events import BaseProactorEventLoop
@@ -16,10 +16,10 @@ class ProactorSocketTransportTests(unittest.TestCase):
def setUp(self):
self.loop = test_utils.TestLoop()
- self.proactor = unittest.mock.Mock()
+ self.proactor = mock.Mock()
self.loop._proactor = self.proactor
self.protocol = test_utils.make_test_protocol(asyncio.Protocol)
- self.sock = unittest.mock.Mock(socket.socket)
+ self.sock = mock.Mock(socket.socket)
def test_ctor(self):
fut = asyncio.Future(loop=self.loop)
@@ -56,7 +56,7 @@ class ProactorSocketTransportTests(unittest.TestCase):
self.assertRaises(AssertionError, tr._loop_reading, res)
- tr.close = unittest.mock.Mock()
+ tr.close = mock.Mock()
tr._read_fut = res
tr._loop_reading(res)
self.assertFalse(self.loop._proactor.recv.called)
@@ -67,7 +67,7 @@ class ProactorSocketTransportTests(unittest.TestCase):
err = self.loop._proactor.recv.side_effect = ConnectionAbortedError()
tr = _ProactorSocketTransport(self.loop, self.sock, self.protocol)
- tr._fatal_error = unittest.mock.Mock()
+ tr._fatal_error = mock.Mock()
tr._loop_reading()
tr._fatal_error.assert_called_with(
err,
@@ -78,7 +78,7 @@ class ProactorSocketTransportTests(unittest.TestCase):
tr = _ProactorSocketTransport(self.loop, self.sock, self.protocol)
tr._closing = True
- tr._fatal_error = unittest.mock.Mock()
+ tr._fatal_error = mock.Mock()
tr._loop_reading()
self.assertFalse(tr._fatal_error.called)
@@ -86,7 +86,7 @@ class ProactorSocketTransportTests(unittest.TestCase):
self.loop._proactor.recv.side_effect = ConnectionAbortedError()
tr = _ProactorSocketTransport(self.loop, self.sock, self.protocol)
tr._closing = False
- tr._fatal_error = unittest.mock.Mock()
+ tr._fatal_error = mock.Mock()
tr._loop_reading()
self.assertTrue(tr._fatal_error.called)
@@ -95,8 +95,8 @@ class ProactorSocketTransportTests(unittest.TestCase):
tr = _ProactorSocketTransport(self.loop, self.sock, self.protocol)
tr._closing = False
- tr._fatal_error = unittest.mock.Mock()
- tr._force_close = unittest.mock.Mock()
+ tr._fatal_error = mock.Mock()
+ tr._force_close = mock.Mock()
tr._loop_reading()
self.assertFalse(tr._fatal_error.called)
tr._force_close.assert_called_with(err)
@@ -105,7 +105,7 @@ class ProactorSocketTransportTests(unittest.TestCase):
err = self.loop._proactor.recv.side_effect = (OSError())
tr = _ProactorSocketTransport(self.loop, self.sock, self.protocol)
- tr._fatal_error = unittest.mock.Mock()
+ tr._fatal_error = mock.Mock()
tr._loop_reading()
tr._fatal_error.assert_called_with(
err,
@@ -113,7 +113,7 @@ class ProactorSocketTransportTests(unittest.TestCase):
def test_write(self):
tr = _ProactorSocketTransport(self.loop, self.sock, self.protocol)
- tr._loop_writing = unittest.mock.Mock()
+ tr._loop_writing = mock.Mock()
tr.write(b'data')
self.assertEqual(tr._buffer, None)
tr._loop_writing.assert_called_with(data=b'data')
@@ -125,8 +125,8 @@ class ProactorSocketTransportTests(unittest.TestCase):
def test_write_more(self):
tr = _ProactorSocketTransport(self.loop, self.sock, self.protocol)
- tr._write_fut = unittest.mock.Mock()
- tr._loop_writing = unittest.mock.Mock()
+ tr._write_fut = mock.Mock()
+ tr._loop_writing = mock.Mock()
tr.write(b'data')
self.assertEqual(tr._buffer, b'data')
self.assertFalse(tr._loop_writing.called)
@@ -139,11 +139,11 @@ class ProactorSocketTransportTests(unittest.TestCase):
self.loop._proactor.send.return_value.add_done_callback.\
assert_called_with(tr._loop_writing)
- @unittest.mock.patch('asyncio.proactor_events.logger')
+ @mock.patch('asyncio.proactor_events.logger')
def test_loop_writing_err(self, m_log):
err = self.loop._proactor.send.side_effect = OSError()
tr = _ProactorSocketTransport(self.loop, self.sock, self.protocol)
- tr._fatal_error = unittest.mock.Mock()
+ tr._fatal_error = mock.Mock()
tr._buffer = [b'da', b'ta']
tr._loop_writing()
tr._fatal_error.assert_called_with(
@@ -182,7 +182,7 @@ class ProactorSocketTransportTests(unittest.TestCase):
def test_abort(self):
tr = _ProactorSocketTransport(self.loop, self.sock, self.protocol)
- tr._force_close = unittest.mock.Mock()
+ tr._force_close = mock.Mock()
tr.abort()
tr._force_close.assert_called_with(None)
@@ -201,7 +201,7 @@ class ProactorSocketTransportTests(unittest.TestCase):
def test_close_write_fut(self):
tr = _ProactorSocketTransport(self.loop, self.sock, self.protocol)
- tr._write_fut = unittest.mock.Mock()
+ tr._write_fut = mock.Mock()
tr.close()
test_utils.run_briefly(self.loop)
self.assertFalse(self.protocol.connection_lost.called)
@@ -213,10 +213,10 @@ class ProactorSocketTransportTests(unittest.TestCase):
test_utils.run_briefly(self.loop)
self.assertFalse(self.protocol.connection_lost.called)
- @unittest.mock.patch('asyncio.base_events.logger')
+ @mock.patch('asyncio.base_events.logger')
def test_fatal_error(self, m_logging):
tr = _ProactorSocketTransport(self.loop, self.sock, self.protocol)
- tr._force_close = unittest.mock.Mock()
+ tr._force_close = mock.Mock()
tr._fatal_error(None)
self.assertTrue(tr._force_close.called)
self.assertTrue(m_logging.error.called)
@@ -224,8 +224,8 @@ class ProactorSocketTransportTests(unittest.TestCase):
def test_force_close(self):
tr = _ProactorSocketTransport(self.loop, self.sock, self.protocol)
tr._buffer = [b'data']
- read_fut = tr._read_fut = unittest.mock.Mock()
- write_fut = tr._write_fut = unittest.mock.Mock()
+ read_fut = tr._read_fut = mock.Mock()
+ write_fut = tr._write_fut = mock.Mock()
tr._force_close(None)
read_fut.cancel.assert_called_with()
@@ -346,10 +346,10 @@ class ProactorSocketTransportTests(unittest.TestCase):
class BaseProactorEventLoopTests(unittest.TestCase):
def setUp(self):
- self.sock = unittest.mock.Mock(socket.socket)
- self.proactor = unittest.mock.Mock()
+ self.sock = mock.Mock(socket.socket)
+ self.proactor = mock.Mock()
- self.ssock, self.csock = unittest.mock.Mock(), unittest.mock.Mock()
+ self.ssock, self.csock = mock.Mock(), mock.Mock()
class EventLoop(BaseProactorEventLoop):
def _socketpair(s):
@@ -357,11 +357,11 @@ class BaseProactorEventLoopTests(unittest.TestCase):
self.loop = EventLoop(self.proactor)
- @unittest.mock.patch.object(BaseProactorEventLoop, 'call_soon')
- @unittest.mock.patch.object(BaseProactorEventLoop, '_socketpair')
+ @mock.patch.object(BaseProactorEventLoop, 'call_soon')
+ @mock.patch.object(BaseProactorEventLoop, '_socketpair')
def test_ctor(self, socketpair, call_soon):
ssock, csock = socketpair.return_value = (
- unittest.mock.Mock(), unittest.mock.Mock())
+ mock.Mock(), mock.Mock())
loop = BaseProactorEventLoop(self.proactor)
self.assertIs(loop._ssock, ssock)
self.assertIs(loop._csock, csock)
@@ -377,7 +377,7 @@ class BaseProactorEventLoopTests(unittest.TestCase):
self.assertIsNone(self.loop._csock)
def test_close(self):
- self.loop._close_self_pipe = unittest.mock.Mock()
+ self.loop._close_self_pipe = mock.Mock()
self.loop.close()
self.assertTrue(self.loop._close_self_pipe.called)
self.assertTrue(self.proactor.close.called)
@@ -418,7 +418,7 @@ class BaseProactorEventLoopTests(unittest.TestCase):
self.loop._loop_self_reading)
def test_loop_self_reading_fut(self):
- fut = unittest.mock.Mock()
+ fut = mock.Mock()
self.loop._loop_self_reading(fut)
self.assertTrue(fut.result.called)
self.proactor.recv.assert_called_with(self.ssock, 4096)
@@ -426,7 +426,7 @@ class BaseProactorEventLoopTests(unittest.TestCase):
self.loop._loop_self_reading)
def test_loop_self_reading_exception(self):
- self.loop.close = unittest.mock.Mock()
+ self.loop.close = mock.Mock()
self.proactor.recv.side_effect = OSError()
self.assertRaises(OSError, self.loop._loop_self_reading)
self.assertTrue(self.loop.close.called)
@@ -438,10 +438,10 @@ class BaseProactorEventLoopTests(unittest.TestCase):
def test_process_events(self):
self.loop._process_events([])
- @unittest.mock.patch('asyncio.base_events.logger')
+ @mock.patch('asyncio.base_events.logger')
def test_create_server(self, m_log):
- pf = unittest.mock.Mock()
- call_soon = self.loop.call_soon = unittest.mock.Mock()
+ pf = mock.Mock()
+ call_soon = self.loop.call_soon = mock.Mock()
self.loop._start_serving(pf, self.sock)
self.assertTrue(call_soon.called)
@@ -452,10 +452,10 @@ class BaseProactorEventLoopTests(unittest.TestCase):
self.proactor.accept.assert_called_with(self.sock)
# conn
- fut = unittest.mock.Mock()
- fut.result.return_value = (unittest.mock.Mock(), unittest.mock.Mock())
+ fut = mock.Mock()
+ fut.result.return_value = (mock.Mock(), mock.Mock())
- make_tr = self.loop._make_socket_transport = unittest.mock.Mock()
+ make_tr = self.loop._make_socket_transport = mock.Mock()
loop(fut)
self.assertTrue(fut.result.called)
self.assertTrue(make_tr.called)
@@ -467,8 +467,8 @@ class BaseProactorEventLoopTests(unittest.TestCase):
self.assertTrue(m_log.error.called)
def test_create_server_cancel(self):
- pf = unittest.mock.Mock()
- call_soon = self.loop.call_soon = unittest.mock.Mock()
+ pf = mock.Mock()
+ call_soon = self.loop.call_soon = mock.Mock()
self.loop._start_serving(pf, self.sock)
loop = call_soon.call_args[0][0]
@@ -480,7 +480,7 @@ class BaseProactorEventLoopTests(unittest.TestCase):
self.assertTrue(self.sock.close.called)
def test_stop_serving(self):
- sock = unittest.mock.Mock()
+ sock = mock.Mock()
self.loop._stop_serving(sock)
self.assertTrue(sock.close.called)
self.proactor._stop_serving.assert_called_with(sock)
diff --git a/Lib/test/test_asyncio/test_queues.py b/Lib/test/test_asyncio/test_queues.py
index fc2bf46..f79fee2 100644
--- a/Lib/test/test_asyncio/test_queues.py
+++ b/Lib/test/test_asyncio/test_queues.py
@@ -1,7 +1,7 @@
"""Tests for queues.py"""
import unittest
-import unittest.mock
+from unittest import mock
import asyncio
from asyncio import test_utils
@@ -72,7 +72,7 @@ class QueueBasicTests(_QueueTestBase):
self.assertTrue('_queue=[1]' in fn(q))
def test_ctor_loop(self):
- loop = unittest.mock.Mock()
+ loop = mock.Mock()
q = asyncio.Queue(loop=loop)
self.assertIs(q._loop, loop)
diff --git a/Lib/test/test_asyncio/test_selector_events.py b/Lib/test/test_asyncio/test_selector_events.py
index 247df9e..964b2e8 100644
--- a/Lib/test/test_asyncio/test_selector_events.py
+++ b/Lib/test/test_asyncio/test_selector_events.py
@@ -1,13 +1,12 @@
"""Tests for selector_events.py"""
-import collections
import errno
import gc
import pprint
import socket
import sys
import unittest
-import unittest.mock
+from unittest import mock
try:
import ssl
except ImportError:
@@ -23,14 +22,14 @@ from asyncio.selector_events import _SelectorSocketTransport
from asyncio.selector_events import _SelectorDatagramTransport
-MOCK_ANY = unittest.mock.ANY
+MOCK_ANY = mock.ANY
class TestBaseSelectorEventLoop(BaseSelectorEventLoop):
def _make_self_pipe(self):
- self._ssock = unittest.mock.Mock()
- self._csock = unittest.mock.Mock()
+ self._ssock = mock.Mock()
+ self._csock = mock.Mock()
self._internal_fds += 1
@@ -41,34 +40,34 @@ def list_to_buffer(l=()):
class BaseSelectorEventLoopTests(unittest.TestCase):
def setUp(self):
- selector = unittest.mock.Mock()
+ selector = mock.Mock()
self.loop = TestBaseSelectorEventLoop(selector)
def test_make_socket_transport(self):
- m = unittest.mock.Mock()
- self.loop.add_reader = unittest.mock.Mock()
+ m = mock.Mock()
+ self.loop.add_reader = mock.Mock()
transport = self.loop._make_socket_transport(m, asyncio.Protocol())
self.assertIsInstance(transport, _SelectorSocketTransport)
@unittest.skipIf(ssl is None, 'No ssl module')
def test_make_ssl_transport(self):
- m = unittest.mock.Mock()
- self.loop.add_reader = unittest.mock.Mock()
- self.loop.add_writer = unittest.mock.Mock()
- self.loop.remove_reader = unittest.mock.Mock()
- self.loop.remove_writer = unittest.mock.Mock()
+ m = mock.Mock()
+ self.loop.add_reader = mock.Mock()
+ self.loop.add_writer = mock.Mock()
+ self.loop.remove_reader = mock.Mock()
+ self.loop.remove_writer = mock.Mock()
waiter = asyncio.Future(loop=self.loop)
transport = self.loop._make_ssl_transport(
m, asyncio.Protocol(), m, waiter)
self.assertIsInstance(transport, _SelectorSslTransport)
- @unittest.mock.patch('asyncio.selector_events.ssl', None)
+ @mock.patch('asyncio.selector_events.ssl', None)
def test_make_ssl_transport_without_ssl_error(self):
- m = unittest.mock.Mock()
- self.loop.add_reader = unittest.mock.Mock()
- self.loop.add_writer = unittest.mock.Mock()
- self.loop.remove_reader = unittest.mock.Mock()
- self.loop.remove_writer = unittest.mock.Mock()
+ m = mock.Mock()
+ self.loop.add_reader = mock.Mock()
+ self.loop.add_writer = mock.Mock()
+ self.loop.remove_reader = mock.Mock()
+ self.loop.remove_writer = mock.Mock()
with self.assertRaises(RuntimeError):
self.loop._make_ssl_transport(m, m, m, m)
@@ -77,10 +76,10 @@ class BaseSelectorEventLoopTests(unittest.TestCase):
ssock.fileno.return_value = 7
csock = self.loop._csock
csock.fileno.return_value = 1
- remove_reader = self.loop.remove_reader = unittest.mock.Mock()
+ remove_reader = self.loop.remove_reader = mock.Mock()
self.loop._selector.close()
- self.loop._selector = selector = unittest.mock.Mock()
+ self.loop._selector = selector = mock.Mock()
self.loop.close()
self.assertIsNone(self.loop._selector)
self.assertIsNone(self.loop._csock)
@@ -96,7 +95,7 @@ class BaseSelectorEventLoopTests(unittest.TestCase):
def test_close_no_selector(self):
ssock = self.loop._ssock
csock = self.loop._csock
- remove_reader = self.loop.remove_reader = unittest.mock.Mock()
+ remove_reader = self.loop.remove_reader = mock.Mock()
self.loop._selector.close()
self.loop._selector = None
@@ -126,15 +125,15 @@ class BaseSelectorEventLoopTests(unittest.TestCase):
self.assertRaises(OSError, self.loop._write_to_self)
def test_sock_recv(self):
- sock = unittest.mock.Mock()
- self.loop._sock_recv = unittest.mock.Mock()
+ sock = mock.Mock()
+ self.loop._sock_recv = mock.Mock()
f = self.loop.sock_recv(sock, 1024)
self.assertIsInstance(f, asyncio.Future)
self.loop._sock_recv.assert_called_with(f, False, sock, 1024)
def test__sock_recv_canceled_fut(self):
- sock = unittest.mock.Mock()
+ sock = mock.Mock()
f = asyncio.Future(loop=self.loop)
f.cancel()
@@ -143,30 +142,30 @@ class BaseSelectorEventLoopTests(unittest.TestCase):
self.assertFalse(sock.recv.called)
def test__sock_recv_unregister(self):
- sock = unittest.mock.Mock()
+ sock = mock.Mock()
sock.fileno.return_value = 10
f = asyncio.Future(loop=self.loop)
f.cancel()
- self.loop.remove_reader = unittest.mock.Mock()
+ self.loop.remove_reader = mock.Mock()
self.loop._sock_recv(f, True, sock, 1024)
self.assertEqual((10,), self.loop.remove_reader.call_args[0])
def test__sock_recv_tryagain(self):
f = asyncio.Future(loop=self.loop)
- sock = unittest.mock.Mock()
+ sock = mock.Mock()
sock.fileno.return_value = 10
sock.recv.side_effect = BlockingIOError
- self.loop.add_reader = unittest.mock.Mock()
+ self.loop.add_reader = mock.Mock()
self.loop._sock_recv(f, False, sock, 1024)
self.assertEqual((10, self.loop._sock_recv, f, True, sock, 1024),
self.loop.add_reader.call_args[0])
def test__sock_recv_exception(self):
f = asyncio.Future(loop=self.loop)
- sock = unittest.mock.Mock()
+ sock = mock.Mock()
sock.fileno.return_value = 10
err = sock.recv.side_effect = OSError()
@@ -174,8 +173,8 @@ class BaseSelectorEventLoopTests(unittest.TestCase):
self.assertIs(err, f.exception())
def test_sock_sendall(self):
- sock = unittest.mock.Mock()
- self.loop._sock_sendall = unittest.mock.Mock()
+ sock = mock.Mock()
+ self.loop._sock_sendall = mock.Mock()
f = self.loop.sock_sendall(sock, b'data')
self.assertIsInstance(f, asyncio.Future)
@@ -184,8 +183,8 @@ class BaseSelectorEventLoopTests(unittest.TestCase):
self.loop._sock_sendall.call_args[0])
def test_sock_sendall_nodata(self):
- sock = unittest.mock.Mock()
- self.loop._sock_sendall = unittest.mock.Mock()
+ sock = mock.Mock()
+ self.loop._sock_sendall = mock.Mock()
f = self.loop.sock_sendall(sock, b'')
self.assertIsInstance(f, asyncio.Future)
@@ -194,7 +193,7 @@ class BaseSelectorEventLoopTests(unittest.TestCase):
self.assertFalse(self.loop._sock_sendall.called)
def test__sock_sendall_canceled_fut(self):
- sock = unittest.mock.Mock()
+ sock = mock.Mock()
f = asyncio.Future(loop=self.loop)
f.cancel()
@@ -203,23 +202,23 @@ class BaseSelectorEventLoopTests(unittest.TestCase):
self.assertFalse(sock.send.called)
def test__sock_sendall_unregister(self):
- sock = unittest.mock.Mock()
+ sock = mock.Mock()
sock.fileno.return_value = 10
f = asyncio.Future(loop=self.loop)
f.cancel()
- self.loop.remove_writer = unittest.mock.Mock()
+ self.loop.remove_writer = mock.Mock()
self.loop._sock_sendall(f, True, sock, b'data')
self.assertEqual((10,), self.loop.remove_writer.call_args[0])
def test__sock_sendall_tryagain(self):
f = asyncio.Future(loop=self.loop)
- sock = unittest.mock.Mock()
+ sock = mock.Mock()
sock.fileno.return_value = 10
sock.send.side_effect = BlockingIOError
- self.loop.add_writer = unittest.mock.Mock()
+ self.loop.add_writer = mock.Mock()
self.loop._sock_sendall(f, False, sock, b'data')
self.assertEqual(
(10, self.loop._sock_sendall, f, True, sock, b'data'),
@@ -227,11 +226,11 @@ class BaseSelectorEventLoopTests(unittest.TestCase):
def test__sock_sendall_interrupted(self):
f = asyncio.Future(loop=self.loop)
- sock = unittest.mock.Mock()
+ sock = mock.Mock()
sock.fileno.return_value = 10
sock.send.side_effect = InterruptedError
- self.loop.add_writer = unittest.mock.Mock()
+ self.loop.add_writer = mock.Mock()
self.loop._sock_sendall(f, False, sock, b'data')
self.assertEqual(
(10, self.loop._sock_sendall, f, True, sock, b'data'),
@@ -239,7 +238,7 @@ class BaseSelectorEventLoopTests(unittest.TestCase):
def test__sock_sendall_exception(self):
f = asyncio.Future(loop=self.loop)
- sock = unittest.mock.Mock()
+ sock = mock.Mock()
sock.fileno.return_value = 10
err = sock.send.side_effect = OSError()
@@ -247,7 +246,7 @@ class BaseSelectorEventLoopTests(unittest.TestCase):
self.assertIs(f.exception(), err)
def test__sock_sendall(self):
- sock = unittest.mock.Mock()
+ sock = mock.Mock()
f = asyncio.Future(loop=self.loop)
sock.fileno.return_value = 10
@@ -258,13 +257,13 @@ class BaseSelectorEventLoopTests(unittest.TestCase):
self.assertIsNone(f.result())
def test__sock_sendall_partial(self):
- sock = unittest.mock.Mock()
+ sock = mock.Mock()
f = asyncio.Future(loop=self.loop)
sock.fileno.return_value = 10
sock.send.return_value = 2
- self.loop.add_writer = unittest.mock.Mock()
+ self.loop.add_writer = mock.Mock()
self.loop._sock_sendall(f, False, sock, b'data')
self.assertFalse(f.done())
self.assertEqual(
@@ -272,13 +271,13 @@ class BaseSelectorEventLoopTests(unittest.TestCase):
self.loop.add_writer.call_args[0])
def test__sock_sendall_none(self):
- sock = unittest.mock.Mock()
+ sock = mock.Mock()
f = asyncio.Future(loop=self.loop)
sock.fileno.return_value = 10
sock.send.return_value = 0
- self.loop.add_writer = unittest.mock.Mock()
+ self.loop.add_writer = mock.Mock()
self.loop._sock_sendall(f, False, sock, b'data')
self.assertFalse(f.done())
self.assertEqual(
@@ -286,8 +285,8 @@ class BaseSelectorEventLoopTests(unittest.TestCase):
self.loop.add_writer.call_args[0])
def test_sock_connect(self):
- sock = unittest.mock.Mock()
- self.loop._sock_connect = unittest.mock.Mock()
+ sock = mock.Mock()
+ self.loop._sock_connect = mock.Mock()
f = self.loop.sock_connect(sock, ('127.0.0.1', 8080))
self.assertIsInstance(f, asyncio.Future)
@@ -298,7 +297,7 @@ class BaseSelectorEventLoopTests(unittest.TestCase):
def test__sock_connect(self):
f = asyncio.Future(loop=self.loop)
- sock = unittest.mock.Mock()
+ sock = mock.Mock()
sock.fileno.return_value = 10
self.loop._sock_connect(f, False, sock, ('127.0.0.1', 8080))
@@ -307,7 +306,7 @@ class BaseSelectorEventLoopTests(unittest.TestCase):
self.assertTrue(sock.connect.called)
def test__sock_connect_canceled_fut(self):
- sock = unittest.mock.Mock()
+ sock = mock.Mock()
f = asyncio.Future(loop=self.loop)
f.cancel()
@@ -316,24 +315,24 @@ class BaseSelectorEventLoopTests(unittest.TestCase):
self.assertFalse(sock.connect.called)
def test__sock_connect_unregister(self):
- sock = unittest.mock.Mock()
+ sock = mock.Mock()
sock.fileno.return_value = 10
f = asyncio.Future(loop=self.loop)
f.cancel()
- self.loop.remove_writer = unittest.mock.Mock()
+ self.loop.remove_writer = mock.Mock()
self.loop._sock_connect(f, True, sock, ('127.0.0.1', 8080))
self.assertEqual((10,), self.loop.remove_writer.call_args[0])
def test__sock_connect_tryagain(self):
f = asyncio.Future(loop=self.loop)
- sock = unittest.mock.Mock()
+ sock = mock.Mock()
sock.fileno.return_value = 10
sock.getsockopt.return_value = errno.EAGAIN
- self.loop.add_writer = unittest.mock.Mock()
- self.loop.remove_writer = unittest.mock.Mock()
+ self.loop.add_writer = mock.Mock()
+ self.loop.remove_writer = mock.Mock()
self.loop._sock_connect(f, True, sock, ('127.0.0.1', 8080))
self.assertEqual(
@@ -343,17 +342,17 @@ class BaseSelectorEventLoopTests(unittest.TestCase):
def test__sock_connect_exception(self):
f = asyncio.Future(loop=self.loop)
- sock = unittest.mock.Mock()
+ sock = mock.Mock()
sock.fileno.return_value = 10
sock.getsockopt.return_value = errno.ENOTCONN
- self.loop.remove_writer = unittest.mock.Mock()
+ self.loop.remove_writer = mock.Mock()
self.loop._sock_connect(f, True, sock, ('127.0.0.1', 8080))
self.assertIsInstance(f.exception(), OSError)
def test_sock_accept(self):
- sock = unittest.mock.Mock()
- self.loop._sock_accept = unittest.mock.Mock()
+ sock = mock.Mock()
+ self.loop._sock_accept = mock.Mock()
f = self.loop.sock_accept(sock)
self.assertIsInstance(f, asyncio.Future)
@@ -363,9 +362,9 @@ class BaseSelectorEventLoopTests(unittest.TestCase):
def test__sock_accept(self):
f = asyncio.Future(loop=self.loop)
- conn = unittest.mock.Mock()
+ conn = mock.Mock()
- sock = unittest.mock.Mock()
+ sock = mock.Mock()
sock.fileno.return_value = 10
sock.accept.return_value = conn, ('127.0.0.1', 1000)
@@ -375,7 +374,7 @@ class BaseSelectorEventLoopTests(unittest.TestCase):
self.assertEqual((False,), conn.setblocking.call_args[0])
def test__sock_accept_canceled_fut(self):
- sock = unittest.mock.Mock()
+ sock = mock.Mock()
f = asyncio.Future(loop=self.loop)
f.cancel()
@@ -384,23 +383,23 @@ class BaseSelectorEventLoopTests(unittest.TestCase):
self.assertFalse(sock.accept.called)
def test__sock_accept_unregister(self):
- sock = unittest.mock.Mock()
+ sock = mock.Mock()
sock.fileno.return_value = 10
f = asyncio.Future(loop=self.loop)
f.cancel()
- self.loop.remove_reader = unittest.mock.Mock()
+ self.loop.remove_reader = mock.Mock()
self.loop._sock_accept(f, True, sock)
self.assertEqual((10,), self.loop.remove_reader.call_args[0])
def test__sock_accept_tryagain(self):
f = asyncio.Future(loop=self.loop)
- sock = unittest.mock.Mock()
+ sock = mock.Mock()
sock.fileno.return_value = 10
sock.accept.side_effect = BlockingIOError
- self.loop.add_reader = unittest.mock.Mock()
+ self.loop.add_reader = mock.Mock()
self.loop._sock_accept(f, False, sock)
self.assertEqual(
(10, self.loop._sock_accept, f, True, sock),
@@ -408,7 +407,7 @@ class BaseSelectorEventLoopTests(unittest.TestCase):
def test__sock_accept_exception(self):
f = asyncio.Future(loop=self.loop)
- sock = unittest.mock.Mock()
+ sock = mock.Mock()
sock.fileno.return_value = 10
err = sock.accept.side_effect = OSError()
@@ -428,8 +427,8 @@ class BaseSelectorEventLoopTests(unittest.TestCase):
self.assertIsNone(w)
def test_add_reader_existing(self):
- reader = unittest.mock.Mock()
- writer = unittest.mock.Mock()
+ reader = mock.Mock()
+ writer = mock.Mock()
self.loop._selector.get_key.return_value = selectors.SelectorKey(
1, 1, selectors.EVENT_WRITE, (reader, writer))
cb = lambda: True
@@ -445,7 +444,7 @@ class BaseSelectorEventLoopTests(unittest.TestCase):
self.assertEqual(writer, w)
def test_add_reader_existing_writer(self):
- writer = unittest.mock.Mock()
+ writer = mock.Mock()
self.loop._selector.get_key.return_value = selectors.SelectorKey(
1, 1, selectors.EVENT_WRITE, (None, writer))
cb = lambda: True
@@ -467,8 +466,8 @@ class BaseSelectorEventLoopTests(unittest.TestCase):
self.assertTrue(self.loop._selector.unregister.called)
def test_remove_reader_read_write(self):
- reader = unittest.mock.Mock()
- writer = unittest.mock.Mock()
+ reader = mock.Mock()
+ writer = mock.Mock()
self.loop._selector.get_key.return_value = selectors.SelectorKey(
1, 1, selectors.EVENT_READ | selectors.EVENT_WRITE,
(reader, writer))
@@ -498,8 +497,8 @@ class BaseSelectorEventLoopTests(unittest.TestCase):
self.assertEqual(cb, w._callback)
def test_add_writer_existing(self):
- reader = unittest.mock.Mock()
- writer = unittest.mock.Mock()
+ reader = mock.Mock()
+ writer = mock.Mock()
self.loop._selector.get_key.return_value = selectors.SelectorKey(
1, 1, selectors.EVENT_READ, (reader, writer))
cb = lambda: True
@@ -522,8 +521,8 @@ class BaseSelectorEventLoopTests(unittest.TestCase):
self.assertTrue(self.loop._selector.unregister.called)
def test_remove_writer_read_write(self):
- reader = unittest.mock.Mock()
- writer = unittest.mock.Mock()
+ reader = mock.Mock()
+ writer = mock.Mock()
self.loop._selector.get_key.return_value = selectors.SelectorKey(
1, 1, selectors.EVENT_READ | selectors.EVENT_WRITE,
(reader, writer))
@@ -541,10 +540,10 @@ class BaseSelectorEventLoopTests(unittest.TestCase):
self.loop.remove_writer(1))
def test_process_events_read(self):
- reader = unittest.mock.Mock()
+ reader = mock.Mock()
reader._cancelled = False
- self.loop._add_callback = unittest.mock.Mock()
+ self.loop._add_callback = mock.Mock()
self.loop._process_events(
[(selectors.SelectorKey(
1, 1, selectors.EVENT_READ, (reader, None)),
@@ -553,10 +552,10 @@ class BaseSelectorEventLoopTests(unittest.TestCase):
self.loop._add_callback.assert_called_with(reader)
def test_process_events_read_cancelled(self):
- reader = unittest.mock.Mock()
+ reader = mock.Mock()
reader.cancelled = True
- self.loop.remove_reader = unittest.mock.Mock()
+ self.loop.remove_reader = mock.Mock()
self.loop._process_events(
[(selectors.SelectorKey(
1, 1, selectors.EVENT_READ, (reader, None)),
@@ -564,10 +563,10 @@ class BaseSelectorEventLoopTests(unittest.TestCase):
self.loop.remove_reader.assert_called_with(1)
def test_process_events_write(self):
- writer = unittest.mock.Mock()
+ writer = mock.Mock()
writer._cancelled = False
- self.loop._add_callback = unittest.mock.Mock()
+ self.loop._add_callback = mock.Mock()
self.loop._process_events(
[(selectors.SelectorKey(1, 1, selectors.EVENT_WRITE,
(None, writer)),
@@ -575,9 +574,9 @@ class BaseSelectorEventLoopTests(unittest.TestCase):
self.loop._add_callback.assert_called_with(writer)
def test_process_events_write_cancelled(self):
- writer = unittest.mock.Mock()
+ writer = mock.Mock()
writer.cancelled = True
- self.loop.remove_writer = unittest.mock.Mock()
+ self.loop.remove_writer = mock.Mock()
self.loop._process_events(
[(selectors.SelectorKey(1, 1, selectors.EVENT_WRITE,
@@ -591,7 +590,7 @@ class SelectorTransportTests(unittest.TestCase):
def setUp(self):
self.loop = test_utils.TestLoop()
self.protocol = test_utils.make_test_protocol(asyncio.Protocol)
- self.sock = unittest.mock.Mock(socket.socket)
+ self.sock = mock.Mock(socket.socket)
self.sock.fileno.return_value = 7
def test_ctor(self):
@@ -602,7 +601,7 @@ class SelectorTransportTests(unittest.TestCase):
def test_abort(self):
tr = _SelectorTransport(self.loop, self.sock, self.protocol, None)
- tr._force_close = unittest.mock.Mock()
+ tr._force_close = mock.Mock()
tr.abort()
tr._force_close.assert_called_with(None)
@@ -632,8 +631,8 @@ class SelectorTransportTests(unittest.TestCase):
def test_force_close(self):
tr = _SelectorTransport(self.loop, self.sock, self.protocol, None)
tr._buffer.extend(b'1')
- self.loop.add_reader(7, unittest.mock.sentinel)
- self.loop.add_writer(7, unittest.mock.sentinel)
+ self.loop.add_reader(7, mock.sentinel)
+ self.loop.add_writer(7, mock.sentinel)
tr._force_close(None)
self.assertTrue(tr._closing)
@@ -646,11 +645,11 @@ class SelectorTransportTests(unittest.TestCase):
self.assertFalse(self.loop.readers)
self.assertEqual(1, self.loop.remove_reader_count[7])
- @unittest.mock.patch('asyncio.log.logger.error')
+ @mock.patch('asyncio.log.logger.error')
def test_fatal_error(self, m_exc):
exc = OSError()
tr = _SelectorTransport(self.loop, self.sock, self.protocol, None)
- tr._force_close = unittest.mock.Mock()
+ tr._force_close = mock.Mock()
tr._fatal_error(exc)
m_exc.assert_called_with(
@@ -682,7 +681,7 @@ class SelectorSocketTransportTests(unittest.TestCase):
def setUp(self):
self.loop = test_utils.TestLoop()
self.protocol = test_utils.make_test_protocol(asyncio.Protocol)
- self.sock = unittest.mock.Mock(socket.socket)
+ self.sock = mock.Mock(socket.socket)
self.sock_fd = self.sock.fileno.return_value = 7
def test_ctor(self):
@@ -724,7 +723,7 @@ class SelectorSocketTransportTests(unittest.TestCase):
def test_read_ready_eof(self):
transport = _SelectorSocketTransport(
self.loop, self.sock, self.protocol)
- transport.close = unittest.mock.Mock()
+ transport.close = mock.Mock()
self.sock.recv.return_value = b''
transport._read_ready()
@@ -735,7 +734,7 @@ class SelectorSocketTransportTests(unittest.TestCase):
def test_read_ready_eof_keep_open(self):
transport = _SelectorSocketTransport(
self.loop, self.sock, self.protocol)
- transport.close = unittest.mock.Mock()
+ transport.close = mock.Mock()
self.sock.recv.return_value = b''
self.protocol.eof_received.return_value = True
@@ -744,45 +743,45 @@ class SelectorSocketTransportTests(unittest.TestCase):
self.protocol.eof_received.assert_called_with()
self.assertFalse(transport.close.called)
- @unittest.mock.patch('logging.exception')
+ @mock.patch('logging.exception')
def test_read_ready_tryagain(self, m_exc):
self.sock.recv.side_effect = BlockingIOError
transport = _SelectorSocketTransport(
self.loop, self.sock, self.protocol)
- transport._fatal_error = unittest.mock.Mock()
+ transport._fatal_error = mock.Mock()
transport._read_ready()
self.assertFalse(transport._fatal_error.called)
- @unittest.mock.patch('logging.exception')
+ @mock.patch('logging.exception')
def test_read_ready_tryagain_interrupted(self, m_exc):
self.sock.recv.side_effect = InterruptedError
transport = _SelectorSocketTransport(
self.loop, self.sock, self.protocol)
- transport._fatal_error = unittest.mock.Mock()
+ transport._fatal_error = mock.Mock()
transport._read_ready()
self.assertFalse(transport._fatal_error.called)
- @unittest.mock.patch('logging.exception')
+ @mock.patch('logging.exception')
def test_read_ready_conn_reset(self, m_exc):
err = self.sock.recv.side_effect = ConnectionResetError()
transport = _SelectorSocketTransport(
self.loop, self.sock, self.protocol)
- transport._force_close = unittest.mock.Mock()
+ transport._force_close = mock.Mock()
transport._read_ready()
transport._force_close.assert_called_with(err)
- @unittest.mock.patch('logging.exception')
+ @mock.patch('logging.exception')
def test_read_ready_err(self, m_exc):
err = self.sock.recv.side_effect = OSError()
transport = _SelectorSocketTransport(
self.loop, self.sock, self.protocol)
- transport._fatal_error = unittest.mock.Mock()
+ transport._fatal_error = mock.Mock()
transport._read_ready()
transport._fatal_error.assert_called_with(
@@ -891,14 +890,14 @@ class SelectorSocketTransportTests(unittest.TestCase):
self.loop.assert_writer(7, transport._write_ready)
self.assertEqual(list_to_buffer([b'data']), transport._buffer)
- @unittest.mock.patch('asyncio.selector_events.logger')
+ @mock.patch('asyncio.selector_events.logger')
def test_write_exception(self, m_log):
err = self.sock.send.side_effect = OSError()
data = b'data'
transport = _SelectorSocketTransport(
self.loop, self.sock, self.protocol)
- transport._fatal_error = unittest.mock.Mock()
+ transport._fatal_error = mock.Mock()
transport.write(data)
transport._fatal_error.assert_called_with(
err,
@@ -1002,17 +1001,17 @@ class SelectorSocketTransportTests(unittest.TestCase):
transport = _SelectorSocketTransport(
self.loop, self.sock, self.protocol)
- transport._fatal_error = unittest.mock.Mock()
+ transport._fatal_error = mock.Mock()
transport._buffer.extend(b'data')
transport._write_ready()
transport._fatal_error.assert_called_with(
err,
'Fatal write error on socket transport')
- @unittest.mock.patch('asyncio.base_events.logger')
+ @mock.patch('asyncio.base_events.logger')
def test_write_ready_exception_and_close(self, m_log):
self.sock.send.side_effect = OSError()
- remove_writer = self.loop.remove_writer = unittest.mock.Mock()
+ remove_writer = self.loop.remove_writer = mock.Mock()
transport = _SelectorSocketTransport(
self.loop, self.sock, self.protocol)
@@ -1053,11 +1052,11 @@ class SelectorSslTransportTests(unittest.TestCase):
def setUp(self):
self.loop = test_utils.TestLoop()
self.protocol = test_utils.make_test_protocol(asyncio.Protocol)
- self.sock = unittest.mock.Mock(socket.socket)
+ self.sock = mock.Mock(socket.socket)
self.sock.fileno.return_value = 7
- self.sslsock = unittest.mock.Mock()
+ self.sslsock = mock.Mock()
self.sslsock.fileno.return_value = 1
- self.sslcontext = unittest.mock.Mock()
+ self.sslcontext = mock.Mock()
self.sslcontext.wrap_socket.return_value = self.sslsock
def _make_one(self, create_waiter=None):
@@ -1162,7 +1161,7 @@ class SelectorSslTransportTests(unittest.TestCase):
transport.write(b'data')
self.assertEqual(transport._conn_lost, 2)
- @unittest.mock.patch('asyncio.selector_events.logger')
+ @mock.patch('asyncio.selector_events.logger')
def test_write_exception(self, m_log):
transport = self._make_one()
transport._conn_lost = 1
@@ -1182,11 +1181,11 @@ class SelectorSslTransportTests(unittest.TestCase):
self.assertEqual((b'data',), self.protocol.data_received.call_args[0])
def test_read_ready_write_wants_read(self):
- self.loop.add_writer = unittest.mock.Mock()
+ self.loop.add_writer = mock.Mock()
self.sslsock.recv.side_effect = BlockingIOError
transport = self._make_one()
transport._write_wants_read = True
- transport._write_ready = unittest.mock.Mock()
+ transport._write_ready = mock.Mock()
transport._buffer.extend(b'data')
transport._read_ready()
@@ -1198,7 +1197,7 @@ class SelectorSslTransportTests(unittest.TestCase):
def test_read_ready_recv_eof(self):
self.sslsock.recv.return_value = b''
transport = self._make_one()
- transport.close = unittest.mock.Mock()
+ transport.close = mock.Mock()
transport._read_ready()
transport.close.assert_called_with()
self.protocol.eof_received.assert_called_with()
@@ -1206,7 +1205,7 @@ class SelectorSslTransportTests(unittest.TestCase):
def test_read_ready_recv_conn_reset(self):
err = self.sslsock.recv.side_effect = ConnectionResetError()
transport = self._make_one()
- transport._force_close = unittest.mock.Mock()
+ transport._force_close = mock.Mock()
transport._read_ready()
transport._force_close.assert_called_with(err)
@@ -1226,8 +1225,8 @@ class SelectorSslTransportTests(unittest.TestCase):
self.assertFalse(self.protocol.data_received.called)
def test_read_ready_recv_write(self):
- self.loop.remove_reader = unittest.mock.Mock()
- self.loop.add_writer = unittest.mock.Mock()
+ self.loop.remove_reader = mock.Mock()
+ self.loop.add_writer = mock.Mock()
self.sslsock.recv.side_effect = ssl.SSLWantWriteError
transport = self._make_one()
transport._read_ready()
@@ -1241,7 +1240,7 @@ class SelectorSslTransportTests(unittest.TestCase):
def test_read_ready_recv_exc(self):
err = self.sslsock.recv.side_effect = OSError()
transport = self._make_one()
- transport._fatal_error = unittest.mock.Mock()
+ transport._fatal_error = mock.Mock()
transport._read_ready()
transport._fatal_error.assert_called_with(
err,
@@ -1313,7 +1312,7 @@ class SelectorSslTransportTests(unittest.TestCase):
transport = self._make_one()
transport._buffer = list_to_buffer([b'data'])
- self.loop.remove_writer = unittest.mock.Mock()
+ self.loop.remove_writer = mock.Mock()
self.sslsock.send.side_effect = ssl.SSLWantReadError
transport._write_ready()
self.assertFalse(self.protocol.data_received.called)
@@ -1325,7 +1324,7 @@ class SelectorSslTransportTests(unittest.TestCase):
transport = self._make_one()
transport._buffer = list_to_buffer([b'data'])
- transport._fatal_error = unittest.mock.Mock()
+ transport._fatal_error = mock.Mock()
transport._write_ready()
transport._fatal_error.assert_called_with(
err,
@@ -1333,11 +1332,11 @@ class SelectorSslTransportTests(unittest.TestCase):
self.assertEqual(list_to_buffer(), transport._buffer)
def test_write_ready_read_wants_write(self):
- self.loop.add_reader = unittest.mock.Mock()
+ self.loop.add_reader = mock.Mock()
self.sslsock.send.side_effect = BlockingIOError
transport = self._make_one()
transport._read_wants_write = True
- transport._read_ready = unittest.mock.Mock()
+ transport._read_ready = mock.Mock()
transport._write_ready()
self.assertFalse(transport._read_wants_write)
@@ -1374,11 +1373,11 @@ class SelectorSslTransportTests(unittest.TestCase):
class SelectorSslWithoutSslTransportTests(unittest.TestCase):
- @unittest.mock.patch('asyncio.selector_events.ssl', None)
+ @mock.patch('asyncio.selector_events.ssl', None)
def test_ssl_transport_requires_ssl_module(self):
- Mock = unittest.mock.Mock
+ Mock = mock.Mock
with self.assertRaises(RuntimeError):
- transport = _SelectorSslTransport(Mock(), Mock(), Mock(), Mock())
+ _SelectorSslTransport(Mock(), Mock(), Mock(), Mock())
class SelectorDatagramTransportTests(unittest.TestCase):
@@ -1386,7 +1385,7 @@ class SelectorDatagramTransportTests(unittest.TestCase):
def setUp(self):
self.loop = test_utils.TestLoop()
self.protocol = test_utils.make_test_protocol(asyncio.DatagramProtocol)
- self.sock = unittest.mock.Mock(spec_set=socket.socket)
+ self.sock = mock.Mock(spec_set=socket.socket)
self.sock.fileno.return_value = 7
def test_read_ready(self):
@@ -1404,7 +1403,7 @@ class SelectorDatagramTransportTests(unittest.TestCase):
self.loop, self.sock, self.protocol)
self.sock.recvfrom.side_effect = BlockingIOError
- transport._fatal_error = unittest.mock.Mock()
+ transport._fatal_error = mock.Mock()
transport._read_ready()
self.assertFalse(transport._fatal_error.called)
@@ -1414,7 +1413,7 @@ class SelectorDatagramTransportTests(unittest.TestCase):
self.loop, self.sock, self.protocol)
err = self.sock.recvfrom.side_effect = RuntimeError()
- transport._fatal_error = unittest.mock.Mock()
+ transport._fatal_error = mock.Mock()
transport._read_ready()
transport._fatal_error.assert_called_with(
@@ -1426,7 +1425,7 @@ class SelectorDatagramTransportTests(unittest.TestCase):
self.loop, self.sock, self.protocol)
err = self.sock.recvfrom.side_effect = OSError()
- transport._fatal_error = unittest.mock.Mock()
+ transport._fatal_error = mock.Mock()
transport._read_ready()
self.assertFalse(transport._fatal_error.called)
@@ -1518,14 +1517,14 @@ class SelectorDatagramTransportTests(unittest.TestCase):
self.assertEqual(
[(b'data', ('0.0.0.0', 12345))], list(transport._buffer))
- @unittest.mock.patch('asyncio.selector_events.logger')
+ @mock.patch('asyncio.selector_events.logger')
def test_sendto_exception(self, m_log):
data = b'data'
err = self.sock.sendto.side_effect = RuntimeError()
transport = _SelectorDatagramTransport(
self.loop, self.sock, self.protocol)
- transport._fatal_error = unittest.mock.Mock()
+ transport._fatal_error = mock.Mock()
transport.sendto(data, ())
self.assertTrue(transport._fatal_error.called)
@@ -1549,7 +1548,7 @@ class SelectorDatagramTransportTests(unittest.TestCase):
transport = _SelectorDatagramTransport(
self.loop, self.sock, self.protocol)
- transport._fatal_error = unittest.mock.Mock()
+ transport._fatal_error = mock.Mock()
transport.sendto(data, ())
self.assertEqual(transport._conn_lost, 0)
@@ -1562,7 +1561,7 @@ class SelectorDatagramTransportTests(unittest.TestCase):
transport = _SelectorDatagramTransport(
self.loop, self.sock, self.protocol, ('0.0.0.0', 1))
- transport._fatal_error = unittest.mock.Mock()
+ transport._fatal_error = mock.Mock()
transport.sendto(data)
self.assertFalse(transport._fatal_error.called)
@@ -1643,7 +1642,7 @@ class SelectorDatagramTransportTests(unittest.TestCase):
transport = _SelectorDatagramTransport(
self.loop, self.sock, self.protocol)
- transport._fatal_error = unittest.mock.Mock()
+ transport._fatal_error = mock.Mock()
transport._buffer.append((b'data', ()))
transport._sendto_ready()
@@ -1656,7 +1655,7 @@ class SelectorDatagramTransportTests(unittest.TestCase):
transport = _SelectorDatagramTransport(
self.loop, self.sock, self.protocol)
- transport._fatal_error = unittest.mock.Mock()
+ transport._fatal_error = mock.Mock()
transport._buffer.append((b'data', ()))
transport._sendto_ready()
@@ -1667,14 +1666,14 @@ class SelectorDatagramTransportTests(unittest.TestCase):
transport = _SelectorDatagramTransport(
self.loop, self.sock, self.protocol, ('0.0.0.0', 1))
- transport._fatal_error = unittest.mock.Mock()
+ transport._fatal_error = mock.Mock()
transport._buffer.append((b'data', ()))
transport._sendto_ready()
self.assertFalse(transport._fatal_error.called)
self.assertTrue(self.protocol.error_received.called)
- @unittest.mock.patch('asyncio.base_events.logger.error')
+ @mock.patch('asyncio.base_events.logger.error')
def test_fatal_error_connected(self, m_exc):
transport = _SelectorDatagramTransport(
self.loop, self.sock, self.protocol, ('0.0.0.0', 1))
diff --git a/Lib/test/test_asyncio/test_streams.py b/Lib/test/test_asyncio/test_streams.py
index ca792f2..031499e 100644
--- a/Lib/test/test_asyncio/test_streams.py
+++ b/Lib/test/test_asyncio/test_streams.py
@@ -1,10 +1,9 @@
"""Tests for streams.py."""
-import functools
import gc
import socket
import unittest
-import unittest.mock
+from unittest import mock
try:
import ssl
except ImportError:
@@ -29,7 +28,7 @@ class StreamReaderTests(unittest.TestCase):
self.loop.close()
gc.collect()
- @unittest.mock.patch('asyncio.streams.events')
+ @mock.patch('asyncio.streams.events')
def test_ctor_global_loop(self, m_events):
stream = asyncio.StreamReader()
self.assertIs(stream._loop, m_events.get_event_loop.return_value)
diff --git a/Lib/test/test_asyncio/test_tasks.py b/Lib/test/test_asyncio/test_tasks.py
index 6d03dc7..ced3431 100644
--- a/Lib/test/test_asyncio/test_tasks.py
+++ b/Lib/test/test_asyncio/test_tasks.py
@@ -830,7 +830,7 @@ class TaskTests(unittest.TestCase):
v = yield from f
self.assertEqual(v, 'a')
- res = loop.run_until_complete(asyncio.Task(foo(), loop=loop))
+ loop.run_until_complete(asyncio.Task(foo(), loop=loop))
def test_as_completed_reverse_wait(self):
@@ -964,13 +964,9 @@ class TaskTests(unittest.TestCase):
loop = test_utils.TestLoop(gen)
self.addCleanup(loop.close)
- sleepfut = None
-
@asyncio.coroutine
def sleep(dt):
- nonlocal sleepfut
- sleepfut = asyncio.sleep(dt, loop=loop)
- yield from sleepfut
+ yield from asyncio.sleep(dt, loop=loop)
@asyncio.coroutine
def doit():
diff --git a/Lib/test/test_asyncio/test_transports.py b/Lib/test/test_asyncio/test_transports.py
index 4c64526..cfbdf3e 100644
--- a/Lib/test/test_asyncio/test_transports.py
+++ b/Lib/test/test_asyncio/test_transports.py
@@ -1,7 +1,7 @@
"""Tests for transports.py."""
import unittest
-import unittest.mock
+from unittest import mock
import asyncio
from asyncio import transports
@@ -23,7 +23,7 @@ class TransportTests(unittest.TestCase):
def test_writelines(self):
transport = asyncio.Transport()
- transport.write = unittest.mock.Mock()
+ transport.write = mock.Mock()
transport.writelines([b'line1',
bytearray(b'line2'),
@@ -70,7 +70,7 @@ class TransportTests(unittest.TestCase):
return 512
transport = MyTransport()
- transport._protocol = unittest.mock.Mock()
+ transport._protocol = mock.Mock()
self.assertFalse(transport._protocol_paused)
diff --git a/Lib/test/test_asyncio/test_unix_events.py b/Lib/test/test_asyncio/test_unix_events.py
index 9e489c2..cc74383 100644
--- a/Lib/test/test_asyncio/test_unix_events.py
+++ b/Lib/test/test_asyncio/test_unix_events.py
@@ -13,7 +13,7 @@ import sys
import tempfile
import threading
import unittest
-import unittest.mock
+from unittest import mock
if sys.platform == 'win32':
raise unittest.SkipTest('UNIX only')
@@ -25,7 +25,7 @@ from asyncio import test_utils
from asyncio import unix_events
-MOCK_ANY = unittest.mock.ANY
+MOCK_ANY = mock.ANY
@unittest.skipUnless(signal, 'Signals are not supported')
@@ -48,15 +48,15 @@ class SelectorEventLoopSignalTests(unittest.TestCase):
self.loop._handle_signal(signal.NSIG + 1, ())
def test_handle_signal_cancelled_handler(self):
- h = asyncio.Handle(unittest.mock.Mock(), (),
- loop=unittest.mock.Mock())
+ h = asyncio.Handle(mock.Mock(), (),
+ loop=mock.Mock())
h.cancel()
self.loop._signal_handlers[signal.NSIG + 1] = h
- self.loop.remove_signal_handler = unittest.mock.Mock()
+ self.loop.remove_signal_handler = mock.Mock()
self.loop._handle_signal(signal.NSIG + 1, ())
self.loop.remove_signal_handler.assert_called_with(signal.NSIG + 1)
- @unittest.mock.patch('asyncio.unix_events.signal')
+ @mock.patch('asyncio.unix_events.signal')
def test_add_signal_handler_setup_error(self, m_signal):
m_signal.NSIG = signal.NSIG
m_signal.set_wakeup_fd.side_effect = ValueError
@@ -66,7 +66,7 @@ class SelectorEventLoopSignalTests(unittest.TestCase):
self.loop.add_signal_handler,
signal.SIGINT, lambda: True)
- @unittest.mock.patch('asyncio.unix_events.signal')
+ @mock.patch('asyncio.unix_events.signal')
def test_add_signal_handler(self, m_signal):
m_signal.NSIG = signal.NSIG
@@ -76,7 +76,7 @@ class SelectorEventLoopSignalTests(unittest.TestCase):
self.assertIsInstance(h, asyncio.Handle)
self.assertEqual(h._callback, cb)
- @unittest.mock.patch('asyncio.unix_events.signal')
+ @mock.patch('asyncio.unix_events.signal')
def test_add_signal_handler_install_error(self, m_signal):
m_signal.NSIG = signal.NSIG
@@ -94,8 +94,8 @@ class SelectorEventLoopSignalTests(unittest.TestCase):
self.loop.add_signal_handler,
signal.SIGINT, lambda: True)
- @unittest.mock.patch('asyncio.unix_events.signal')
- @unittest.mock.patch('asyncio.base_events.logger')
+ @mock.patch('asyncio.unix_events.signal')
+ @mock.patch('asyncio.base_events.logger')
def test_add_signal_handler_install_error2(self, m_logging, m_signal):
m_signal.NSIG = signal.NSIG
@@ -111,8 +111,8 @@ class SelectorEventLoopSignalTests(unittest.TestCase):
self.assertFalse(m_logging.info.called)
self.assertEqual(1, m_signal.set_wakeup_fd.call_count)
- @unittest.mock.patch('asyncio.unix_events.signal')
- @unittest.mock.patch('asyncio.base_events.logger')
+ @mock.patch('asyncio.unix_events.signal')
+ @mock.patch('asyncio.base_events.logger')
def test_add_signal_handler_install_error3(self, m_logging, m_signal):
class Err(OSError):
errno = errno.EINVAL
@@ -126,7 +126,7 @@ class SelectorEventLoopSignalTests(unittest.TestCase):
self.assertFalse(m_logging.info.called)
self.assertEqual(2, m_signal.set_wakeup_fd.call_count)
- @unittest.mock.patch('asyncio.unix_events.signal')
+ @mock.patch('asyncio.unix_events.signal')
def test_remove_signal_handler(self, m_signal):
m_signal.NSIG = signal.NSIG
@@ -139,7 +139,7 @@ class SelectorEventLoopSignalTests(unittest.TestCase):
self.assertEqual(
(signal.SIGHUP, m_signal.SIG_DFL), m_signal.signal.call_args[0])
- @unittest.mock.patch('asyncio.unix_events.signal')
+ @mock.patch('asyncio.unix_events.signal')
def test_remove_signal_handler_2(self, m_signal):
m_signal.NSIG = signal.NSIG
m_signal.SIGINT = signal.SIGINT
@@ -156,8 +156,8 @@ class SelectorEventLoopSignalTests(unittest.TestCase):
(signal.SIGINT, m_signal.default_int_handler),
m_signal.signal.call_args[0])
- @unittest.mock.patch('asyncio.unix_events.signal')
- @unittest.mock.patch('asyncio.base_events.logger')
+ @mock.patch('asyncio.unix_events.signal')
+ @mock.patch('asyncio.base_events.logger')
def test_remove_signal_handler_cleanup_error(self, m_logging, m_signal):
m_signal.NSIG = signal.NSIG
self.loop.add_signal_handler(signal.SIGHUP, lambda: True)
@@ -167,7 +167,7 @@ class SelectorEventLoopSignalTests(unittest.TestCase):
self.loop.remove_signal_handler(signal.SIGHUP)
self.assertTrue(m_logging.info)
- @unittest.mock.patch('asyncio.unix_events.signal')
+ @mock.patch('asyncio.unix_events.signal')
def test_remove_signal_handler_error(self, m_signal):
m_signal.NSIG = signal.NSIG
self.loop.add_signal_handler(signal.SIGHUP, lambda: True)
@@ -177,7 +177,7 @@ class SelectorEventLoopSignalTests(unittest.TestCase):
self.assertRaises(
OSError, self.loop.remove_signal_handler, signal.SIGHUP)
- @unittest.mock.patch('asyncio.unix_events.signal')
+ @mock.patch('asyncio.unix_events.signal')
def test_remove_signal_handler_error2(self, m_signal):
m_signal.NSIG = signal.NSIG
self.loop.add_signal_handler(signal.SIGHUP, lambda: True)
@@ -189,7 +189,7 @@ class SelectorEventLoopSignalTests(unittest.TestCase):
self.assertRaises(
RuntimeError, self.loop.remove_signal_handler, signal.SIGHUP)
- @unittest.mock.patch('asyncio.unix_events.signal')
+ @mock.patch('asyncio.unix_events.signal')
def test_close(self, m_signal):
m_signal.NSIG = signal.NSIG
@@ -291,16 +291,16 @@ class UnixReadPipeTransportTests(unittest.TestCase):
def setUp(self):
self.loop = test_utils.TestLoop()
self.protocol = test_utils.make_test_protocol(asyncio.Protocol)
- self.pipe = unittest.mock.Mock(spec_set=io.RawIOBase)
+ self.pipe = mock.Mock(spec_set=io.RawIOBase)
self.pipe.fileno.return_value = 5
- fcntl_patcher = unittest.mock.patch('fcntl.fcntl')
+ fcntl_patcher = mock.patch('fcntl.fcntl')
fcntl_patcher.start()
self.addCleanup(fcntl_patcher.stop)
- fstat_patcher = unittest.mock.patch('os.fstat')
+ fstat_patcher = mock.patch('os.fstat')
m_fstat = fstat_patcher.start()
- st = unittest.mock.Mock()
+ st = mock.Mock()
st.st_mode = stat.S_IFIFO
m_fstat.return_value = st
self.addCleanup(fstat_patcher.stop)
@@ -319,7 +319,7 @@ class UnixReadPipeTransportTests(unittest.TestCase):
test_utils.run_briefly(self.loop)
self.assertIsNone(fut.result())
- @unittest.mock.patch('os.read')
+ @mock.patch('os.read')
def test__read_ready(self, m_read):
tr = unix_events._UnixReadPipeTransport(
self.loop, self.pipe, self.protocol)
@@ -329,7 +329,7 @@ class UnixReadPipeTransportTests(unittest.TestCase):
m_read.assert_called_with(5, tr.max_size)
self.protocol.data_received.assert_called_with(b'data')
- @unittest.mock.patch('os.read')
+ @mock.patch('os.read')
def test__read_ready_eof(self, m_read):
tr = unix_events._UnixReadPipeTransport(
self.loop, self.pipe, self.protocol)
@@ -342,7 +342,7 @@ class UnixReadPipeTransportTests(unittest.TestCase):
self.protocol.eof_received.assert_called_with()
self.protocol.connection_lost.assert_called_with(None)
- @unittest.mock.patch('os.read')
+ @mock.patch('os.read')
def test__read_ready_blocked(self, m_read):
tr = unix_events._UnixReadPipeTransport(
self.loop, self.pipe, self.protocol)
@@ -353,14 +353,14 @@ class UnixReadPipeTransportTests(unittest.TestCase):
test_utils.run_briefly(self.loop)
self.assertFalse(self.protocol.data_received.called)
- @unittest.mock.patch('asyncio.log.logger.error')
- @unittest.mock.patch('os.read')
+ @mock.patch('asyncio.log.logger.error')
+ @mock.patch('os.read')
def test__read_ready_error(self, m_read, m_logexc):
tr = unix_events._UnixReadPipeTransport(
self.loop, self.pipe, self.protocol)
err = OSError()
m_read.side_effect = err
- tr._close = unittest.mock.Mock()
+ tr._close = mock.Mock()
tr._read_ready()
m_read.assert_called_with(5, tr.max_size)
@@ -371,17 +371,17 @@ class UnixReadPipeTransportTests(unittest.TestCase):
'\nprotocol:.*\ntransport:.*'),
exc_info=(OSError, MOCK_ANY, MOCK_ANY))
- @unittest.mock.patch('os.read')
+ @mock.patch('os.read')
def test_pause_reading(self, m_read):
tr = unix_events._UnixReadPipeTransport(
self.loop, self.pipe, self.protocol)
- m = unittest.mock.Mock()
+ m = mock.Mock()
self.loop.add_reader(5, m)
tr.pause_reading()
self.assertFalse(self.loop.readers)
- @unittest.mock.patch('os.read')
+ @mock.patch('os.read')
def test_resume_reading(self, m_read):
tr = unix_events._UnixReadPipeTransport(
self.loop, self.pipe, self.protocol)
@@ -389,26 +389,26 @@ class UnixReadPipeTransportTests(unittest.TestCase):
tr.resume_reading()
self.loop.assert_reader(5, tr._read_ready)
- @unittest.mock.patch('os.read')
+ @mock.patch('os.read')
def test_close(self, m_read):
tr = unix_events._UnixReadPipeTransport(
self.loop, self.pipe, self.protocol)
- tr._close = unittest.mock.Mock()
+ tr._close = mock.Mock()
tr.close()
tr._close.assert_called_with(None)
- @unittest.mock.patch('os.read')
+ @mock.patch('os.read')
def test_close_already_closing(self, m_read):
tr = unix_events._UnixReadPipeTransport(
self.loop, self.pipe, self.protocol)
tr._closing = True
- tr._close = unittest.mock.Mock()
+ tr._close = mock.Mock()
tr.close()
self.assertFalse(tr._close.called)
- @unittest.mock.patch('os.read')
+ @mock.patch('os.read')
def test__close(self, m_read):
tr = unix_events._UnixReadPipeTransport(
self.loop, self.pipe, self.protocol)
@@ -459,16 +459,16 @@ class UnixWritePipeTransportTests(unittest.TestCase):
def setUp(self):
self.loop = test_utils.TestLoop()
self.protocol = test_utils.make_test_protocol(asyncio.BaseProtocol)
- self.pipe = unittest.mock.Mock(spec_set=io.RawIOBase)
+ self.pipe = mock.Mock(spec_set=io.RawIOBase)
self.pipe.fileno.return_value = 5
- fcntl_patcher = unittest.mock.patch('fcntl.fcntl')
+ fcntl_patcher = mock.patch('fcntl.fcntl')
fcntl_patcher.start()
self.addCleanup(fcntl_patcher.stop)
- fstat_patcher = unittest.mock.patch('os.fstat')
+ fstat_patcher = mock.patch('os.fstat')
m_fstat = fstat_patcher.start()
- st = unittest.mock.Mock()
+ st = mock.Mock()
st.st_mode = stat.S_IFSOCK
m_fstat.return_value = st
self.addCleanup(fstat_patcher.stop)
@@ -493,7 +493,7 @@ class UnixWritePipeTransportTests(unittest.TestCase):
self.loop, self.pipe, self.protocol)
self.assertTrue(tr.can_write_eof())
- @unittest.mock.patch('os.write')
+ @mock.patch('os.write')
def test_write(self, m_write):
tr = unix_events._UnixWritePipeTransport(
self.loop, self.pipe, self.protocol)
@@ -504,7 +504,7 @@ class UnixWritePipeTransportTests(unittest.TestCase):
self.assertFalse(self.loop.writers)
self.assertEqual([], tr._buffer)
- @unittest.mock.patch('os.write')
+ @mock.patch('os.write')
def test_write_no_data(self, m_write):
tr = unix_events._UnixWritePipeTransport(
self.loop, self.pipe, self.protocol)
@@ -514,7 +514,7 @@ class UnixWritePipeTransportTests(unittest.TestCase):
self.assertFalse(self.loop.writers)
self.assertEqual([], tr._buffer)
- @unittest.mock.patch('os.write')
+ @mock.patch('os.write')
def test_write_partial(self, m_write):
tr = unix_events._UnixWritePipeTransport(
self.loop, self.pipe, self.protocol)
@@ -525,7 +525,7 @@ class UnixWritePipeTransportTests(unittest.TestCase):
self.loop.assert_writer(5, tr._write_ready)
self.assertEqual([b'ta'], tr._buffer)
- @unittest.mock.patch('os.write')
+ @mock.patch('os.write')
def test_write_buffer(self, m_write):
tr = unix_events._UnixWritePipeTransport(
self.loop, self.pipe, self.protocol)
@@ -537,7 +537,7 @@ class UnixWritePipeTransportTests(unittest.TestCase):
self.loop.assert_writer(5, tr._write_ready)
self.assertEqual([b'previous', b'data'], tr._buffer)
- @unittest.mock.patch('os.write')
+ @mock.patch('os.write')
def test_write_again(self, m_write):
tr = unix_events._UnixWritePipeTransport(
self.loop, self.pipe, self.protocol)
@@ -548,15 +548,15 @@ class UnixWritePipeTransportTests(unittest.TestCase):
self.loop.assert_writer(5, tr._write_ready)
self.assertEqual([b'data'], tr._buffer)
- @unittest.mock.patch('asyncio.unix_events.logger')
- @unittest.mock.patch('os.write')
+ @mock.patch('asyncio.unix_events.logger')
+ @mock.patch('os.write')
def test_write_err(self, m_write, m_log):
tr = unix_events._UnixWritePipeTransport(
self.loop, self.pipe, self.protocol)
err = OSError()
m_write.side_effect = err
- tr._fatal_error = unittest.mock.Mock()
+ tr._fatal_error = mock.Mock()
tr.write(b'data')
m_write.assert_called_with(5, b'data')
self.assertFalse(self.loop.writers)
@@ -576,7 +576,7 @@ class UnixWritePipeTransportTests(unittest.TestCase):
m_log.warning.assert_called_with(
'pipe closed by peer or os.write(pipe, data) raised exception.')
- @unittest.mock.patch('os.write')
+ @mock.patch('os.write')
def test_write_close(self, m_write):
tr = unix_events._UnixWritePipeTransport(
self.loop, self.pipe, self.protocol)
@@ -597,7 +597,7 @@ class UnixWritePipeTransportTests(unittest.TestCase):
test_utils.run_briefly(self.loop)
self.protocol.connection_lost.assert_called_with(None)
- @unittest.mock.patch('os.write')
+ @mock.patch('os.write')
def test__write_ready(self, m_write):
tr = unix_events._UnixWritePipeTransport(
self.loop, self.pipe, self.protocol)
@@ -609,7 +609,7 @@ class UnixWritePipeTransportTests(unittest.TestCase):
self.assertFalse(self.loop.writers)
self.assertEqual([], tr._buffer)
- @unittest.mock.patch('os.write')
+ @mock.patch('os.write')
def test__write_ready_partial(self, m_write):
tr = unix_events._UnixWritePipeTransport(
self.loop, self.pipe, self.protocol)
@@ -622,7 +622,7 @@ class UnixWritePipeTransportTests(unittest.TestCase):
self.loop.assert_writer(5, tr._write_ready)
self.assertEqual([b'a'], tr._buffer)
- @unittest.mock.patch('os.write')
+ @mock.patch('os.write')
def test__write_ready_again(self, m_write):
tr = unix_events._UnixWritePipeTransport(
self.loop, self.pipe, self.protocol)
@@ -635,7 +635,7 @@ class UnixWritePipeTransportTests(unittest.TestCase):
self.loop.assert_writer(5, tr._write_ready)
self.assertEqual([b'data'], tr._buffer)
- @unittest.mock.patch('os.write')
+ @mock.patch('os.write')
def test__write_ready_empty(self, m_write):
tr = unix_events._UnixWritePipeTransport(
self.loop, self.pipe, self.protocol)
@@ -648,8 +648,8 @@ class UnixWritePipeTransportTests(unittest.TestCase):
self.loop.assert_writer(5, tr._write_ready)
self.assertEqual([b'data'], tr._buffer)
- @unittest.mock.patch('asyncio.log.logger.error')
- @unittest.mock.patch('os.write')
+ @mock.patch('asyncio.log.logger.error')
+ @mock.patch('os.write')
def test__write_ready_err(self, m_write, m_logexc):
tr = unix_events._UnixWritePipeTransport(
self.loop, self.pipe, self.protocol)
@@ -672,7 +672,7 @@ class UnixWritePipeTransportTests(unittest.TestCase):
test_utils.run_briefly(self.loop)
self.protocol.connection_lost.assert_called_with(err)
- @unittest.mock.patch('os.write')
+ @mock.patch('os.write')
def test__write_ready_closing(self, m_write):
tr = unix_events._UnixWritePipeTransport(
self.loop, self.pipe, self.protocol)
@@ -689,7 +689,7 @@ class UnixWritePipeTransportTests(unittest.TestCase):
self.protocol.connection_lost.assert_called_with(None)
self.pipe.close.assert_called_with()
- @unittest.mock.patch('os.write')
+ @mock.patch('os.write')
def test_abort(self, m_write):
tr = unix_events._UnixWritePipeTransport(
self.loop, self.pipe, self.protocol)
@@ -742,7 +742,7 @@ class UnixWritePipeTransportTests(unittest.TestCase):
tr = unix_events._UnixWritePipeTransport(
self.loop, self.pipe, self.protocol)
- tr.write_eof = unittest.mock.Mock()
+ tr.write_eof = mock.Mock()
tr.close()
tr.write_eof.assert_called_with()
@@ -750,7 +750,7 @@ class UnixWritePipeTransportTests(unittest.TestCase):
tr = unix_events._UnixWritePipeTransport(
self.loop, self.pipe, self.protocol)
- tr.write_eof = unittest.mock.Mock()
+ tr.write_eof = mock.Mock()
tr._closing = True
tr.close()
self.assertFalse(tr.write_eof.called)
@@ -777,7 +777,7 @@ class UnixWritePipeTransportTests(unittest.TestCase):
class AbstractChildWatcherTests(unittest.TestCase):
def test_not_implemented(self):
- f = unittest.mock.Mock()
+ f = mock.Mock()
watcher = asyncio.AbstractChildWatcher()
self.assertRaises(
NotImplementedError, watcher.add_child_handler, f, f)
@@ -796,7 +796,7 @@ class AbstractChildWatcherTests(unittest.TestCase):
class BaseChildWatcherTests(unittest.TestCase):
def test_not_implemented(self):
- f = unittest.mock.Mock()
+ f = mock.Mock()
watcher = unix_events.BaseChildWatcher()
self.assertRaises(
NotImplementedError, watcher._do_waitpid, f)
@@ -813,14 +813,14 @@ WaitPidMocks = collections.namedtuple("WaitPidMocks",
class ChildWatcherTestsMixin:
- ignore_warnings = unittest.mock.patch.object(log.logger, "warning")
+ ignore_warnings = mock.patch.object(log.logger, "warning")
def setUp(self):
self.loop = test_utils.TestLoop()
self.running = False
self.zombies = {}
- with unittest.mock.patch.object(
+ with mock.patch.object(
self.loop, "add_signal_handler") as self.m_add_signal_handler:
self.watcher = self.create_watcher()
self.watcher.attach_loop(self.loop)
@@ -864,8 +864,8 @@ class ChildWatcherTestsMixin:
def waitpid_mocks(func):
def wrapped_func(self):
def patch(target, wrapper):
- return unittest.mock.patch(target, wraps=wrapper,
- new_callable=unittest.mock.Mock)
+ return mock.patch(target, wraps=wrapper,
+ new_callable=mock.Mock)
with patch('os.WTERMSIG', self.WTERMSIG) as m_WTERMSIG, \
patch('os.WEXITSTATUS', self.WEXITSTATUS) as m_WEXITSTATUS, \
@@ -881,7 +881,7 @@ class ChildWatcherTestsMixin:
@waitpid_mocks
def test_sigchld(self, m):
# register a child
- callback = unittest.mock.Mock()
+ callback = mock.Mock()
with self.watcher:
self.running = True
@@ -941,8 +941,8 @@ class ChildWatcherTestsMixin:
@waitpid_mocks
def test_sigchld_two_children(self, m):
- callback1 = unittest.mock.Mock()
- callback2 = unittest.mock.Mock()
+ callback1 = mock.Mock()
+ callback2 = mock.Mock()
# register child 1
with self.watcher:
@@ -1045,8 +1045,8 @@ class ChildWatcherTestsMixin:
@waitpid_mocks
def test_sigchld_two_children_terminating_together(self, m):
- callback1 = unittest.mock.Mock()
- callback2 = unittest.mock.Mock()
+ callback1 = mock.Mock()
+ callback2 = mock.Mock()
# register child 1
with self.watcher:
@@ -1115,7 +1115,7 @@ class ChildWatcherTestsMixin:
@waitpid_mocks
def test_sigchld_race_condition(self, m):
# register a child
- callback = unittest.mock.Mock()
+ callback = mock.Mock()
with self.watcher:
# child terminates before being registered
@@ -1136,8 +1136,8 @@ class ChildWatcherTestsMixin:
@waitpid_mocks
def test_sigchld_replace_handler(self, m):
- callback1 = unittest.mock.Mock()
- callback2 = unittest.mock.Mock()
+ callback1 = mock.Mock()
+ callback2 = mock.Mock()
# register a child
with self.watcher:
@@ -1189,7 +1189,7 @@ class ChildWatcherTestsMixin:
@waitpid_mocks
def test_sigchld_remove_handler(self, m):
- callback = unittest.mock.Mock()
+ callback = mock.Mock()
# register a child
with self.watcher:
@@ -1221,7 +1221,7 @@ class ChildWatcherTestsMixin:
@waitpid_mocks
def test_sigchld_unknown_status(self, m):
- callback = unittest.mock.Mock()
+ callback = mock.Mock()
# register a child
with self.watcher:
@@ -1258,9 +1258,9 @@ class ChildWatcherTestsMixin:
@waitpid_mocks
def test_remove_child_handler(self, m):
- callback1 = unittest.mock.Mock()
- callback2 = unittest.mock.Mock()
- callback3 = unittest.mock.Mock()
+ callback1 = mock.Mock()
+ callback2 = mock.Mock()
+ callback3 = mock.Mock()
# register children
with self.watcher:
@@ -1291,7 +1291,7 @@ class ChildWatcherTestsMixin:
@waitpid_mocks
def test_sigchld_unhandled_exception(self, m):
- callback = unittest.mock.Mock()
+ callback = mock.Mock()
# register a child
with self.watcher:
@@ -1301,8 +1301,8 @@ class ChildWatcherTestsMixin:
# raise an exception
m.waitpid.side_effect = ValueError
- with unittest.mock.patch.object(log.logger,
- 'error') as m_error:
+ with mock.patch.object(log.logger,
+ 'error') as m_error:
self.assertEqual(self.watcher._sig_chld(), None)
self.assertTrue(m_error.called)
@@ -1310,7 +1310,7 @@ class ChildWatcherTestsMixin:
@waitpid_mocks
def test_sigchld_child_reaped_elsewhere(self, m):
# register a child
- callback = unittest.mock.Mock()
+ callback = mock.Mock()
with self.watcher:
self.running = True
@@ -1346,8 +1346,8 @@ class ChildWatcherTestsMixin:
@waitpid_mocks
def test_sigchld_unknown_pid_during_registration(self, m):
# register two children
- callback1 = unittest.mock.Mock()
- callback2 = unittest.mock.Mock()
+ callback1 = mock.Mock()
+ callback2 = mock.Mock()
with self.ignore_warnings, self.watcher:
self.running = True
@@ -1367,7 +1367,7 @@ class ChildWatcherTestsMixin:
@waitpid_mocks
def test_set_loop(self, m):
# register a child
- callback = unittest.mock.Mock()
+ callback = mock.Mock()
with self.watcher:
self.running = True
@@ -1376,19 +1376,16 @@ class ChildWatcherTestsMixin:
# attach a new loop
old_loop = self.loop
self.loop = test_utils.TestLoop()
+ patch = mock.patch.object
- with unittest.mock.patch.object(
- old_loop,
- "remove_signal_handler") as m_old_remove_signal_handler, \
- unittest.mock.patch.object(
- self.loop,
- "add_signal_handler") as m_new_add_signal_handler:
+ with patch(old_loop, "remove_signal_handler") as m_old_remove, \
+ patch(self.loop, "add_signal_handler") as m_new_add:
self.watcher.attach_loop(self.loop)
- m_old_remove_signal_handler.assert_called_once_with(
+ m_old_remove.assert_called_once_with(
signal.SIGCHLD)
- m_new_add_signal_handler.assert_called_once_with(
+ m_new_add.assert_called_once_with(
signal.SIGCHLD, self.watcher._sig_chld)
# child terminates
@@ -1401,9 +1398,9 @@ class ChildWatcherTestsMixin:
@waitpid_mocks
def test_set_loop_race_condition(self, m):
# register 3 children
- callback1 = unittest.mock.Mock()
- callback2 = unittest.mock.Mock()
- callback3 = unittest.mock.Mock()
+ callback1 = mock.Mock()
+ callback2 = mock.Mock()
+ callback3 = mock.Mock()
with self.watcher:
self.running = True
@@ -1415,7 +1412,7 @@ class ChildWatcherTestsMixin:
old_loop = self.loop
self.loop = None
- with unittest.mock.patch.object(
+ with mock.patch.object(
old_loop, "remove_signal_handler") as m_remove_signal_handler:
self.watcher.attach_loop(None)
@@ -1435,7 +1432,7 @@ class ChildWatcherTestsMixin:
# attach a new loop
self.loop = test_utils.TestLoop()
- with unittest.mock.patch.object(
+ with mock.patch.object(
self.loop, "add_signal_handler") as m_add_signal_handler:
self.watcher.attach_loop(self.loop)
@@ -1461,8 +1458,7 @@ class ChildWatcherTestsMixin:
@waitpid_mocks
def test_close(self, m):
# register two children
- callback1 = unittest.mock.Mock()
- callback2 = unittest.mock.Mock()
+ callback1 = mock.Mock()
with self.watcher:
self.running = True
@@ -1479,7 +1475,7 @@ class ChildWatcherTestsMixin:
if isinstance(self.watcher, asyncio.FastChildWatcher):
self.assertEqual(len(self.watcher._zombies), 1)
- with unittest.mock.patch.object(
+ with mock.patch.object(
self.loop,
"remove_signal_handler") as m_remove_signal_handler:
diff --git a/Lib/test/test_asyncio/test_windows_events.py b/Lib/test/test_asyncio/test_windows_events.py
index 846049a..f652258 100644
--- a/Lib/test/test_asyncio/test_windows_events.py
+++ b/Lib/test/test_asyncio/test_windows_events.py
@@ -8,7 +8,6 @@ if sys.platform != 'win32':
import _winapi
import asyncio
-from asyncio import test_utils
from asyncio import _overlapped
from asyncio import windows_events
@@ -50,7 +49,7 @@ class ProactorTests(unittest.TestCase):
ADDRESS = r'\\.\pipe\test_double_bind-%s' % os.getpid()
server1 = windows_events.PipeServer(ADDRESS)
with self.assertRaises(PermissionError):
- server2 = windows_events.PipeServer(ADDRESS)
+ windows_events.PipeServer(ADDRESS)
server1.close()
def test_pipe(self):
diff --git a/Lib/test/test_asyncio/test_windows_utils.py b/Lib/test/test_asyncio/test_windows_utils.py
index fa9d66c..9daf434 100644
--- a/Lib/test/test_asyncio/test_windows_utils.py
+++ b/Lib/test/test_asyncio/test_windows_utils.py
@@ -1,9 +1,11 @@
"""Tests for window_utils"""
+import socket
import sys
import test.support
import unittest
-import unittest.mock
+from test.support import IPV6_ENABLED
+from unittest import mock
if sys.platform != 'win32':
raise unittest.SkipTest('Windows only')
@@ -16,23 +18,40 @@ from asyncio import _overlapped
class WinsocketpairTests(unittest.TestCase):
- def test_winsocketpair(self):
- ssock, csock = windows_utils.socketpair()
-
+ def check_winsocketpair(self, ssock, csock):
csock.send(b'xxx')
self.assertEqual(b'xxx', ssock.recv(1024))
-
csock.close()
ssock.close()
- @unittest.mock.patch('asyncio.windows_utils.socket')
+ def test_winsocketpair(self):
+ ssock, csock = windows_utils.socketpair()
+ self.check_winsocketpair(ssock, csock)
+
+ @unittest.skipUnless(IPV6_ENABLED, 'IPv6 not supported or enabled')
+ def test_winsocketpair_ipv6(self):
+ ssock, csock = windows_utils.socketpair(family=socket.AF_INET6)
+ self.check_winsocketpair(ssock, csock)
+
+ @mock.patch('asyncio.windows_utils.socket')
def test_winsocketpair_exc(self, m_socket):
+ m_socket.AF_INET = socket.AF_INET
+ m_socket.SOCK_STREAM = socket.SOCK_STREAM
m_socket.socket.return_value.getsockname.return_value = ('', 12345)
m_socket.socket.return_value.accept.return_value = object(), object()
m_socket.socket.return_value.connect.side_effect = OSError()
self.assertRaises(OSError, windows_utils.socketpair)
+ def test_winsocketpair_invalid_args(self):
+ self.assertRaises(ValueError,
+ windows_utils.socketpair, family=socket.AF_UNSPEC)
+ self.assertRaises(ValueError,
+ windows_utils.socketpair, type=socket.SOCK_DGRAM)
+ self.assertRaises(ValueError,
+ windows_utils.socketpair, proto=1)
+
+
class PipeTests(unittest.TestCase):
diff --git a/Lib/test/test_copy.py b/Lib/test/test_copy.py
index cde0bae..eb8d18c 100644
--- a/Lib/test/test_copy.py
+++ b/Lib/test/test_copy.py
@@ -98,6 +98,7 @@ class TestCopy(unittest.TestCase):
pass
tests = [None, 42, 2**100, 3.14, True, False, 1j,
"hello", "hello\u1234", f.__code__,
+ b"world", bytes(range(256)),
NewStyle, range(10), Classic, max, WithMetaclass]
for x in tests:
self.assertIs(copy.copy(x), x)
diff --git a/Lib/test/test_email/test_email.py b/Lib/test/test_email/test_email.py
index 26ed96c..2f89a10 100644
--- a/Lib/test/test_email/test_email.py
+++ b/Lib/test/test_email/test_email.py
@@ -124,6 +124,14 @@ class TestMessageAPI(TestEmailBase):
msg.set_payload([])
self.assertEqual(msg.get_payload(), [])
+ def test_attach_when_payload_is_string(self):
+ msg = Message()
+ msg['Content-Type'] = 'multipart/mixed'
+ msg.set_payload('string payload')
+ sub_msg = MIMEMessage(Message())
+ self.assertRaisesRegex(TypeError, "[Aa]ttach.*non-multipart",
+ msg.attach, sub_msg)
+
def test_get_charsets(self):
eq = self.assertEqual
diff --git a/Lib/test/test_email/test_policy.py b/Lib/test/test_email/test_policy.py
index 06ad5f2..e797f36 100644
--- a/Lib/test/test_email/test_policy.py
+++ b/Lib/test/test_email/test_policy.py
@@ -319,5 +319,14 @@ class TestPolicyPropagation(unittest.TestCase):
self.assertEqual(msg.as_string(), "Subject: testXTo: fooXX")
+class TestConcretePolicies(unittest.TestCase):
+
+ def test_header_store_parse_rejects_newlines(self):
+ instance = email.policy.EmailPolicy()
+ self.assertRaises(ValueError,
+ instance.header_store_parse,
+ 'From', 'spam\negg@foo.py')
+
+
if __name__ == '__main__':
unittest.main()
diff --git a/Lib/test/test_fileinput.py b/Lib/test/test_fileinput.py
index db6082c..eba55c9 100644
--- a/Lib/test/test_fileinput.py
+++ b/Lib/test/test_fileinput.py
@@ -260,6 +260,27 @@ class FileInputTests(unittest.TestCase):
fi.readline()
self.assertTrue(custom_open_hook.invoked, "openhook not invoked")
+ def test_readline(self):
+ with open(TESTFN, 'wb') as f:
+ f.write(b'A\nB\r\nC\r')
+ # Fill TextIOWrapper buffer.
+ f.write(b'123456789\n' * 1000)
+ # Issue #20501: readline() shouldn't read whole file.
+ f.write(b'\x80')
+ self.addCleanup(safe_unlink, TESTFN)
+
+ with FileInput(files=TESTFN,
+ openhook=hook_encoded('ascii'), bufsize=8) as fi:
+ try:
+ self.assertEqual(fi.readline(), 'A\n')
+ self.assertEqual(fi.readline(), 'B\n')
+ self.assertEqual(fi.readline(), 'C\n')
+ except UnicodeDecodeError:
+ self.fail('Read to end of file')
+ with self.assertRaises(UnicodeDecodeError):
+ # Read to the end of file.
+ list(fi)
+
def test_context_manager(self):
try:
t1 = writeTmp(1, ["A\nB\nC"])
@@ -837,6 +858,26 @@ class Test_hook_encoded(unittest.TestCase):
self.assertIs(kwargs.pop('encoding'), encoding)
self.assertFalse(kwargs)
+ def test_modes(self):
+ with open(TESTFN, 'wb') as f:
+ # UTF-7 is a convenient, seldom used encoding
+ f.write(b'A\nB\r\nC\rD+IKw-')
+ self.addCleanup(safe_unlink, TESTFN)
+
+ def check(mode, expected_lines):
+ with FileInput(files=TESTFN, mode=mode,
+ openhook=hook_encoded('utf-7')) as fi:
+ lines = list(fi)
+ self.assertEqual(lines, expected_lines)
+
+ check('r', ['A\n', 'B\n', 'C\n', 'D\u20ac'])
+ with self.assertWarns(DeprecationWarning):
+ check('rU', ['A\n', 'B\n', 'C\n', 'D\u20ac'])
+ with self.assertWarns(DeprecationWarning):
+ check('U', ['A\n', 'B\n', 'C\n', 'D\u20ac'])
+ with self.assertRaises(ValueError):
+ check('rb', ['A\n', 'B\r\n', 'C\r', 'D\u20ac'])
+
if __name__ == "__main__":
unittest.main()
diff --git a/Lib/test/test_genericpath.py b/Lib/test/test_genericpath.py
index e967897..e59ed4d 100644
--- a/Lib/test/test_genericpath.py
+++ b/Lib/test/test_genericpath.py
@@ -329,7 +329,6 @@ class CommonTest(GenericTest):
self.assertEqual(expandvars("$[foo]bar"), "$[foo]bar")
self.assertEqual(expandvars("$bar bar"), "$bar bar")
self.assertEqual(expandvars("$?bar"), "$?bar")
- self.assertEqual(expandvars("${foo}bar"), "barbar")
self.assertEqual(expandvars("$foo}bar"), "bar}bar")
self.assertEqual(expandvars("${foo"), "${foo")
self.assertEqual(expandvars("${{foo}}"), "baz1}")
@@ -342,13 +341,40 @@ class CommonTest(GenericTest):
self.assertEqual(expandvars(b"$[foo]bar"), b"$[foo]bar")
self.assertEqual(expandvars(b"$bar bar"), b"$bar bar")
self.assertEqual(expandvars(b"$?bar"), b"$?bar")
- self.assertEqual(expandvars(b"${foo}bar"), b"barbar")
self.assertEqual(expandvars(b"$foo}bar"), b"bar}bar")
self.assertEqual(expandvars(b"${foo"), b"${foo")
self.assertEqual(expandvars(b"${{foo}}"), b"baz1}")
self.assertEqual(expandvars(b"$foo$foo"), b"barbar")
self.assertEqual(expandvars(b"$bar$bar"), b"$bar$bar")
+ @unittest.skipUnless(support.FS_NONASCII, 'need support.FS_NONASCII')
+ def test_expandvars_nonascii(self):
+ if self.pathmodule.__name__ == 'macpath':
+ self.skipTest('macpath.expandvars is a stub')
+ expandvars = self.pathmodule.expandvars
+ def check(value, expected):
+ self.assertEqual(expandvars(value), expected)
+ with support.EnvironmentVarGuard() as env:
+ env.clear()
+ nonascii = support.FS_NONASCII
+ env['spam'] = nonascii
+ env[nonascii] = 'ham' + nonascii
+ check(nonascii, nonascii)
+ check('$spam bar', '%s bar' % nonascii)
+ check('${spam}bar', '%sbar' % nonascii)
+ check('${%s}bar' % nonascii, 'ham%sbar' % nonascii)
+ check('$bar%s bar' % nonascii, '$bar%s bar' % nonascii)
+ check('$spam}bar', '%s}bar' % nonascii)
+
+ check(os.fsencode(nonascii), os.fsencode(nonascii))
+ check(b'$spam bar', os.fsencode('%s bar' % nonascii))
+ check(b'${spam}bar', os.fsencode('%sbar' % nonascii))
+ check(os.fsencode('${%s}bar' % nonascii),
+ os.fsencode('ham%sbar' % nonascii))
+ check(os.fsencode('$bar%s bar' % nonascii),
+ os.fsencode('$bar%s bar' % nonascii))
+ check(b'$spam}bar', os.fsencode('%s}bar' % nonascii))
+
def test_abspath(self):
self.assertIn("foo", self.pathmodule.abspath("foo"))
with warnings.catch_warnings():
diff --git a/Lib/test/test_grammar.py b/Lib/test/test_grammar.py
index 9da6338..bba8820 100644
--- a/Lib/test/test_grammar.py
+++ b/Lib/test/test_grammar.py
@@ -319,8 +319,8 @@ class GrammarTests(unittest.TestCase):
def f(self, *, __kw:1):
pass
class Ham(Spam): pass
- self.assertEquals(Spam.f.__annotations__, {'_Spam__kw': 1})
- self.assertEquals(Ham.f.__annotations__, {'_Spam__kw': 1})
+ self.assertEqual(Spam.f.__annotations__, {'_Spam__kw': 1})
+ self.assertEqual(Ham.f.__annotations__, {'_Spam__kw': 1})
# Check for SF Bug #1697248 - mixing decorators and a return annotation
def null(x): return x
@null
diff --git a/Lib/test/test_gzip.py b/Lib/test/test_gzip.py
index 5289407..b7a7e03 100644
--- a/Lib/test/test_gzip.py
+++ b/Lib/test/test_gzip.py
@@ -421,6 +421,13 @@ class TestGzip(BaseTest):
with gzip.GzipFile(fileobj=io.BytesIO(gzdata)) as f:
self.assertEqual(f.read(), b'Test')
+ def test_prepend_error(self):
+ # See issue #20875
+ with gzip.open(self.filename, "wb") as f:
+ f.write(data1)
+ with gzip.open(self.filename, "rb") as f:
+ f.fileobj.prepend()
+
class TestOpen(BaseTest):
def test_binary_modes(self):
uncompressed = data1 * 50
diff --git a/Lib/test/test_idle.py b/Lib/test/test_idle.py
index 819151e..7770ee5 100644
--- a/Lib/test/test_idle.py
+++ b/Lib/test/test_idle.py
@@ -14,6 +14,7 @@ if use_resources and 'gui' in use_resources:
try:
root = tk.Tk()
root.destroy()
+ del root
except tk.TclError:
while 'gui' in use_resources:
use_resources.remove('gui')
diff --git a/Lib/test/test_importlib/source/test_file_loader.py b/Lib/test/test_importlib/source/test_file_loader.py
index 25a3dae..2d415f9 100644
--- a/Lib/test/test_importlib/source/test_file_loader.py
+++ b/Lib/test/test_importlib/source/test_file_loader.py
@@ -190,6 +190,7 @@ class SimpleTest(abc.LoaderTests):
if os.path.exists(pycache):
shutil.rmtree(pycache)
+ @source_util.writes_bytecode_files
def test_timestamp_overflow(self):
# When a modification timestamp is larger than 2**32, it should be
# truncated rather than raise an OverflowError.
diff --git a/Lib/test/test_io.py b/Lib/test/test_io.py
index 4182da3..267537f 100644
--- a/Lib/test/test_io.py
+++ b/Lib/test/test_io.py
@@ -855,6 +855,16 @@ class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
bufio.__init__(rawio)
self.assertEqual(b"abc", bufio.read())
+ def test_uninitialized(self):
+ bufio = self.tp.__new__(self.tp)
+ del bufio
+ bufio = self.tp.__new__(self.tp)
+ self.assertRaisesRegex((ValueError, AttributeError),
+ 'uninitialized|has no attribute',
+ bufio.read, 0)
+ bufio.__init__(self.MockRawIO())
+ self.assertEqual(bufio.read(0), b'')
+
def test_read(self):
for arg in (None, 7):
rawio = self.MockRawIO((b"abc", b"d", b"efg"))
@@ -1106,6 +1116,16 @@ class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
bufio.flush()
self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
+ def test_uninitialized(self):
+ bufio = self.tp.__new__(self.tp)
+ del bufio
+ bufio = self.tp.__new__(self.tp)
+ self.assertRaisesRegex((ValueError, AttributeError),
+ 'uninitialized|has no attribute',
+ bufio.write, b'')
+ bufio.__init__(self.MockRawIO())
+ self.assertEqual(bufio.write(b''), 0)
+
def test_detach_flush(self):
raw = self.MockRawIO()
buf = self.tp(raw)
@@ -1390,6 +1410,20 @@ class BufferedRWPairTest(unittest.TestCase):
pair = self.tp(self.MockRawIO(), self.MockRawIO())
self.assertFalse(pair.closed)
+ def test_uninitialized(self):
+ pair = self.tp.__new__(self.tp)
+ del pair
+ pair = self.tp.__new__(self.tp)
+ self.assertRaisesRegex((ValueError, AttributeError),
+ 'uninitialized|has no attribute',
+ pair.read, 0)
+ self.assertRaisesRegex((ValueError, AttributeError),
+ 'uninitialized|has no attribute',
+ pair.write, b'')
+ pair.__init__(self.MockRawIO(), self.MockRawIO())
+ self.assertEqual(pair.read(0), b'')
+ self.assertEqual(pair.write(b''), 0)
+
def test_detach(self):
pair = self.tp(self.MockRawIO(), self.MockRawIO())
self.assertRaises(self.UnsupportedOperation, pair.detach)
@@ -1516,6 +1550,10 @@ class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
BufferedReaderTest.test_constructor(self)
BufferedWriterTest.test_constructor(self)
+ def test_uninitialized(self):
+ BufferedReaderTest.test_uninitialized(self)
+ BufferedWriterTest.test_uninitialized(self)
+
def test_read_and_write(self):
raw = self.MockRawIO((b"asdf", b"ghjk"))
rw = self.tp(raw, 8)
diff --git a/Lib/test/test_modulefinder.py b/Lib/test/test_modulefinder.py
index 53ea232..ed30d6f 100644
--- a/Lib/test/test_modulefinder.py
+++ b/Lib/test/test_modulefinder.py
@@ -1,5 +1,7 @@
import os
import errno
+import importlib.machinery
+import py_compile
import shutil
import unittest
import tempfile
@@ -208,6 +210,14 @@ a/module.py
from . import *
"""]
+bytecode_test = [
+ "a",
+ ["a"],
+ [],
+ [],
+ ""
+]
+
def open_file(path):
dirname = os.path.dirname(path)
@@ -288,6 +298,16 @@ class ModuleFinderTest(unittest.TestCase):
def test_relative_imports_4(self):
self._do_test(relative_import_test_4)
+ def test_bytecode(self):
+ base_path = os.path.join(TEST_DIR, 'a')
+ source_path = base_path + importlib.machinery.SOURCE_SUFFIXES[0]
+ bytecode_path = base_path + importlib.machinery.BYTECODE_SUFFIXES[0]
+ with open_file(source_path) as file:
+ file.write('testing_modulefinder = True\n')
+ py_compile.compile(source_path, cfile=bytecode_path)
+ os.remove(source_path)
+ self._do_test(bytecode_test)
+
def test_main():
support.run_unittest(ModuleFinderTest)
diff --git a/Lib/test/test_ntpath.py b/Lib/test/test_ntpath.py
index e789e22..000fc75 100644
--- a/Lib/test/test_ntpath.py
+++ b/Lib/test/test_ntpath.py
@@ -22,13 +22,15 @@ def tester(fn, wantResult):
fn = fn.replace('["', '[b"')
fn = fn.replace(", '", ", b'")
fn = fn.replace(', "', ', b"')
+ fn = os.fsencode(fn).decode('latin1')
+ fn = fn.encode('ascii', 'backslashreplace').decode('ascii')
with warnings.catch_warnings():
warnings.simplefilter("ignore", DeprecationWarning)
gotResult = eval(fn)
if isinstance(wantResult, str):
- wantResult = wantResult.encode('ascii')
+ wantResult = os.fsencode(wantResult)
elif isinstance(wantResult, tuple):
- wantResult = tuple(r.encode('ascii') for r in wantResult)
+ wantResult = tuple(os.fsencode(r) for r in wantResult)
gotResult = eval(fn)
if wantResult != gotResult:
@@ -223,7 +225,6 @@ class TestNtpath(unittest.TestCase):
tester('ntpath.expandvars("$[foo]bar")', "$[foo]bar")
tester('ntpath.expandvars("$bar bar")', "$bar bar")
tester('ntpath.expandvars("$?bar")', "$?bar")
- tester('ntpath.expandvars("${foo}bar")', "barbar")
tester('ntpath.expandvars("$foo}bar")', "bar}bar")
tester('ntpath.expandvars("${foo")', "${foo")
tester('ntpath.expandvars("${{foo}}")', "baz1}")
@@ -237,6 +238,26 @@ class TestNtpath(unittest.TestCase):
tester('ntpath.expandvars("%foo%%bar")', "bar%bar")
tester('ntpath.expandvars("\'%foo%\'%bar")', "\'%foo%\'%bar")
+ @unittest.skipUnless(support.FS_NONASCII, 'need support.FS_NONASCII')
+ def test_expandvars_nonascii(self):
+ def check(value, expected):
+ tester('ntpath.expandvars(%r)' % value, expected)
+ with support.EnvironmentVarGuard() as env:
+ env.clear()
+ nonascii = support.FS_NONASCII
+ env['spam'] = nonascii
+ env[nonascii] = 'ham' + nonascii
+ check('$spam bar', '%s bar' % nonascii)
+ check('$%s bar' % nonascii, '$%s bar' % nonascii)
+ check('${spam}bar', '%sbar' % nonascii)
+ check('${%s}bar' % nonascii, 'ham%sbar' % nonascii)
+ check('$spam}bar', '%s}bar' % nonascii)
+ check('$%s}bar' % nonascii, '$%s}bar' % nonascii)
+ check('%spam% bar', '%s bar' % nonascii)
+ check('%{}% bar'.format(nonascii), 'ham%s bar' % nonascii)
+ check('%spam%bar', '%sbar' % nonascii)
+ check('%{}%bar'.format(nonascii), 'ham%sbar' % nonascii)
+
def test_abspath(self):
# ntpath.abspath() can only be used on a system with the "nt" module
# (reasonably), so we protect this test with "import nt". This allows
diff --git a/Lib/test/test_pkgutil.py b/Lib/test/test_pkgutil.py
index aafdf3c..c4410a9 100644
--- a/Lib/test/test_pkgutil.py
+++ b/Lib/test/test_pkgutil.py
@@ -368,6 +368,11 @@ class ImportlibMigrationTests(unittest.TestCase):
def test_main():
run_unittest(PkgutilTests, PkgutilPEP302Tests, ExtendPathTests,
NestedNamespacePackageTest, ImportlibMigrationTests)
+ # this is necessary if test is run repeated (like when finding leaks)
+ import zipimport
+ import importlib
+ zipimport._zip_directory_cache.clear()
+ importlib.invalidate_caches()
if __name__ == '__main__':
diff --git a/Lib/test/test_posix.py b/Lib/test/test_posix.py
index 009f29d..9a5ee91 100644
--- a/Lib/test/test_posix.py
+++ b/Lib/test/test_posix.py
@@ -1161,7 +1161,7 @@ class PosixGroupsTester(unittest.TestCase):
def test_initgroups(self):
# find missing group
- g = max(self.saved_groups) + 1
+ g = max(self.saved_groups or [0]) + 1
name = pwd.getpwuid(posix.getuid()).pw_name
posix.initgroups(name, g)
self.assertIn(g, posix.getgroups())
diff --git a/Lib/test/test_pydoc.py b/Lib/test/test_pydoc.py
index 81b58a2..f26fb15 100644
--- a/Lib/test/test_pydoc.py
+++ b/Lib/test/test_pydoc.py
@@ -387,6 +387,16 @@ class PydocDocTest(unittest.TestCase):
print_diffs(expected_text, result)
self.fail("outputs are not equal, see diff above")
+ def test_text_enum_member_with_value_zero(self):
+ # Test issue #20654 to ensure enum member with value 0 can be
+ # displayed. It used to throw KeyError: 'zero'.
+ import enum
+ class BinaryInteger(enum.IntEnum):
+ zero = 0
+ one = 1
+ doc = pydoc.render_doc(BinaryInteger)
+ self.assertIn('<BinaryInteger.zero: 0>', doc)
+
def test_issue8225(self):
# Test issue8225 to ensure no doc link appears for xml.etree
result, doc_loc = get_pydoc_text(xml.etree)
diff --git a/Lib/test/test_range.py b/Lib/test/test_range.py
index d9d4cf8..fecda1b 100644
--- a/Lib/test/test_range.py
+++ b/Lib/test/test_range.py
@@ -380,6 +380,30 @@ class RangeTest(unittest.TestCase):
it = pickle.loads(d)
self.assertEqual(list(it), data[1:])
+ def test_exhausted_iterator_pickling(self):
+ r = range(2**65, 2**65+2)
+ i = iter(r)
+ while True:
+ r = next(i)
+ if r == 2**65+1:
+ break
+ d = pickle.dumps(i)
+ i2 = pickle.loads(d)
+ self.assertEqual(list(i), [])
+ self.assertEqual(list(i2), [])
+
+ def test_large_exhausted_iterator_pickling(self):
+ r = range(20)
+ i = iter(r)
+ while True:
+ r = next(i)
+ if r == 19:
+ break
+ d = pickle.dumps(i)
+ i2 = pickle.loads(d)
+ self.assertEqual(list(i), [])
+ self.assertEqual(list(i2), [])
+
def test_odd_bug(self):
# This used to raise a "SystemError: NULL result without error"
# because the range validation step was eating the exception
diff --git a/Lib/test/test_re.py b/Lib/test/test_re.py
index a229e23..33ccd15 100644
--- a/Lib/test/test_re.py
+++ b/Lib/test/test_re.py
@@ -1205,6 +1205,24 @@ class ReTests(unittest.TestCase):
self.assertEqual(out.getvalue().splitlines(),
['literal 102 ', 'literal 111 ', 'literal 111 '])
+ def test_keyword_parameters(self):
+ # Issue #20283: Accepting the string keyword parameter.
+ pat = re.compile(r'(ab)')
+ self.assertEqual(
+ pat.match(string='abracadabra', pos=7, endpos=10).span(), (7, 9))
+ self.assertEqual(
+ pat.fullmatch(string='abracadabra', pos=7, endpos=9).span(), (7, 9))
+ self.assertEqual(
+ pat.search(string='abracadabra', pos=3, endpos=10).span(), (7, 9))
+ self.assertEqual(
+ pat.findall(string='abracadabra', pos=3, endpos=10), ['ab'])
+ self.assertEqual(
+ pat.split(string='abracadabra', maxsplit=1),
+ ['', 'ab', 'racadabra'])
+ self.assertEqual(
+ pat.scanner(string='abracadabra', pos=3, endpos=10).search().span(),
+ (7, 9))
+
class PatternReprTests(unittest.TestCase):
def check(self, pattern, expected):
diff --git a/Lib/test/test_robotparser.py b/Lib/test/test_robotparser.py
index d1dfd9e..ebc819c 100644
--- a/Lib/test/test_robotparser.py
+++ b/Lib/test/test_robotparser.py
@@ -275,6 +275,7 @@ class NetworkTestCase(unittest.TestCase):
self.skipTest('%s is unavailable' % url)
self.assertEqual(parser.can_fetch("*", robots_url), False)
+ @unittest.skip('does not handle the gzip encoding delivered by pydotorg')
def testPythonOrg(self):
support.requires('network')
with support.transient_internet('www.python.org'):
diff --git a/Lib/test/test_shutil.py b/Lib/test/test_shutil.py
index 1f47b03..a483fe1 100644
--- a/Lib/test/test_shutil.py
+++ b/Lib/test/test_shutil.py
@@ -1492,6 +1492,15 @@ class TestMove(unittest.TestCase):
# Move a dir inside an existing dir on another filesystem.
self.test_move_dir_to_dir()
+ def test_move_dir_sep_to_dir(self):
+ self._check_move_dir(self.src_dir + os.path.sep, self.dst_dir,
+ os.path.join(self.dst_dir, os.path.basename(self.src_dir)))
+
+ @unittest.skipUnless(os.path.altsep, 'requires os.path.altsep')
+ def test_move_dir_altsep_to_dir(self):
+ self._check_move_dir(self.src_dir + os.path.altsep, self.dst_dir,
+ os.path.join(self.dst_dir, os.path.basename(self.src_dir)))
+
def test_existing_file_inside_dest_dir(self):
# A file with the same name inside the destination dir already exists.
with open(self.dst_file, "wb"):
diff --git a/Lib/test/test_socket.py b/Lib/test/test_socket.py
index 2f5ed25..e94f539 100644
--- a/Lib/test/test_socket.py
+++ b/Lib/test/test_socket.py
@@ -1234,9 +1234,15 @@ class GeneralModuleTests(unittest.TestCase):
# Issue #6697.
self.assertRaises(UnicodeEncodeError, socket.getaddrinfo, 'localhost', '\uD800')
- # Issue 17269
+ # Issue 17269: test workaround for OS X platform bug segfault
if hasattr(socket, 'AI_NUMERICSERV'):
- socket.getaddrinfo("localhost", None, 0, 0, 0, socket.AI_NUMERICSERV)
+ try:
+ # The arguments here are undefined and the call may succeed
+ # or fail. All we care here is that it doesn't segfault.
+ socket.getaddrinfo("localhost", None, 0, 0, 0,
+ socket.AI_NUMERICSERV)
+ except socket.gaierror:
+ pass
def test_getnameinfo(self):
# only IP addresses are allowed
diff --git a/Lib/test/test_source_encoding.py b/Lib/test/test_source_encoding.py
index cd9d2b3..0c41e50 100644
--- a/Lib/test/test_source_encoding.py
+++ b/Lib/test/test_source_encoding.py
@@ -5,6 +5,7 @@ from test.support import TESTFN, unlink, unload
import importlib
import os
import sys
+import subprocess
class SourceEncodingTest(unittest.TestCase):
@@ -58,6 +59,15 @@ class SourceEncodingTest(unittest.TestCase):
# two bytes in common with the UTF-8 BOM
self.assertRaises(SyntaxError, eval, b'\xef\xbb\x20')
+ def test_20731(self):
+ sub = subprocess.Popen([sys.executable,
+ os.path.join(os.path.dirname(__file__),
+ 'coding20731.py')],
+ stderr=subprocess.PIPE)
+ err = sub.communicate()[1]
+ self.assertEqual(sub.returncode, 0)
+ self.assertNotIn(b'SyntaxError', err)
+
def test_error_message(self):
compile(b'# -*- coding: iso-8859-15 -*-\n', 'dummy', 'exec')
compile(b'\xef\xbb\xbf\n', 'dummy', 'exec')
diff --git a/Lib/test/test_support.py b/Lib/test/test_support.py
index 70e6396..9bd0a01 100644
--- a/Lib/test/test_support.py
+++ b/Lib/test/test_support.py
@@ -102,7 +102,7 @@ class TestSupport(unittest.TestCase):
self.assertTrue(os.path.isdir(path))
self.assertFalse(os.path.isdir(path))
finally:
- shutil.rmtree(parent_dir)
+ support.rmtree(parent_dir)
def test_temp_dir__path_none(self):
"""Test passing no path."""
diff --git a/Lib/test/test_sys.py b/Lib/test/test_sys.py
index e25f296..5a9699f 100644
--- a/Lib/test/test_sys.py
+++ b/Lib/test/test_sys.py
@@ -1,4 +1,5 @@
import unittest, test.support
+from test.script_helper import assert_python_ok, assert_python_failure
import sys, io, os
import struct
import subprocess
@@ -89,74 +90,54 @@ class SysModuleTest(unittest.TestCase):
# Python/pythonrun.c::PyErr_PrintEx() is tricky.
def test_exit(self):
-
+ # call with two arguments
self.assertRaises(TypeError, sys.exit, 42, 42)
# call without argument
- try:
- sys.exit(0)
- except SystemExit as exc:
- self.assertEqual(exc.code, 0)
- except:
- self.fail("wrong exception")
- else:
- self.fail("no exception")
+ with self.assertRaises(SystemExit) as cm:
+ sys.exit()
+ self.assertIsNone(cm.exception.code)
- # call with tuple argument with one entry
- # entry will be unpacked
- try:
- sys.exit(42)
- except SystemExit as exc:
- self.assertEqual(exc.code, 42)
- except:
- self.fail("wrong exception")
- else:
- self.fail("no exception")
+ rc, out, err = assert_python_ok('-c', 'import sys; sys.exit()')
+ self.assertEqual(rc, 0)
+ self.assertEqual(out, b'')
+ self.assertEqual(err, b'')
# call with integer argument
- try:
+ with self.assertRaises(SystemExit) as cm:
+ sys.exit(42)
+ self.assertEqual(cm.exception.code, 42)
+
+ # call with tuple argument with one entry
+ # entry will be unpacked
+ with self.assertRaises(SystemExit) as cm:
sys.exit((42,))
- except SystemExit as exc:
- self.assertEqual(exc.code, 42)
- except:
- self.fail("wrong exception")
- else:
- self.fail("no exception")
+ self.assertEqual(cm.exception.code, 42)
# call with string argument
- try:
+ with self.assertRaises(SystemExit) as cm:
sys.exit("exit")
- except SystemExit as exc:
- self.assertEqual(exc.code, "exit")
- except:
- self.fail("wrong exception")
- else:
- self.fail("no exception")
+ self.assertEqual(cm.exception.code, "exit")
# call with tuple argument with two entries
- try:
+ with self.assertRaises(SystemExit) as cm:
sys.exit((17, 23))
- except SystemExit as exc:
- self.assertEqual(exc.code, (17, 23))
- except:
- self.fail("wrong exception")
- else:
- self.fail("no exception")
+ self.assertEqual(cm.exception.code, (17, 23))
# test that the exit machinery handles SystemExits properly
- rc = subprocess.call([sys.executable, "-c",
- "raise SystemExit(47)"])
+ rc, out, err = assert_python_failure('-c', 'raise SystemExit(47)')
self.assertEqual(rc, 47)
+ self.assertEqual(out, b'')
+ self.assertEqual(err, b'')
- def check_exit_message(code, expected, env=None):
- process = subprocess.Popen([sys.executable, "-c", code],
- stderr=subprocess.PIPE, env=env)
- stdout, stderr = process.communicate()
- self.assertEqual(process.returncode, 1)
- self.assertTrue(stderr.startswith(expected),
- "%s doesn't start with %s" % (ascii(stderr), ascii(expected)))
+ def check_exit_message(code, expected, **env_vars):
+ rc, out, err = assert_python_failure('-c', code, **env_vars)
+ self.assertEqual(rc, 1)
+ self.assertEqual(out, b'')
+ self.assertTrue(err.startswith(expected),
+ "%s doesn't start with %s" % (ascii(err), ascii(expected)))
- # test that stderr buffer if flushed before the exit message is written
+ # test that stderr buffer is flushed before the exit message is written
# into stderr
check_exit_message(
r'import sys; sys.stderr.write("unflushed,"); sys.exit("message")',
@@ -170,11 +151,9 @@ class SysModuleTest(unittest.TestCase):
# test that the unicode message is encoded to the stderr encoding
# instead of the default encoding (utf8)
- env = os.environ.copy()
- env['PYTHONIOENCODING'] = 'latin-1'
check_exit_message(
r'import sys; sys.exit("h\xe9")',
- b"h\xe9", env=env)
+ b"h\xe9", PYTHONIOENCODING='latin-1')
def test_getdefaultencoding(self):
self.assertRaises(TypeError, sys.getdefaultencoding, 42)
diff --git a/Lib/test/test_tarfile.py b/Lib/test/test_tarfile.py
index ab88be4..92f8bfe 100644
--- a/Lib/test/test_tarfile.py
+++ b/Lib/test/test_tarfile.py
@@ -240,14 +240,16 @@ class ListTest(ReadTest, unittest.TestCase):
self.assertIn(b'ustar/dirtype/', out)
self.assertIn(b'ustar/dirtype-with-size/', out)
# Make sure it is able to print unencodable characters
- self.assertIn(br'ustar/umlauts-'
- br'\udcc4\udcd6\udcdc\udce4\udcf6\udcfc\udcdf', out)
- self.assertIn(br'misc/regtype-hpux-signed-chksum-'
- br'\udcc4\udcd6\udcdc\udce4\udcf6\udcfc\udcdf', out)
- self.assertIn(br'misc/regtype-old-v7-signed-chksum-'
- br'\udcc4\udcd6\udcdc\udce4\udcf6\udcfc\udcdf', out)
- self.assertIn(br'pax/bad-pax-\udce4\udcf6\udcfc', out)
- self.assertIn(br'pax/hdrcharset-\udce4\udcf6\udcfc', out)
+ def conv(b):
+ s = b.decode(self.tar.encoding, 'surrogateescape')
+ return s.encode('ascii', 'backslashreplace')
+ self.assertIn(conv(b'ustar/umlauts-\xc4\xd6\xdc\xe4\xf6\xfc\xdf'), out)
+ self.assertIn(conv(b'misc/regtype-hpux-signed-chksum-'
+ b'\xc4\xd6\xdc\xe4\xf6\xfc\xdf'), out)
+ self.assertIn(conv(b'misc/regtype-old-v7-signed-chksum-'
+ b'\xc4\xd6\xdc\xe4\xf6\xfc\xdf'), out)
+ self.assertIn(conv(b'pax/bad-pax-\xe4\xf6\xfc'), out)
+ self.assertIn(conv(b'pax/hdrcharset-\xe4\xf6\xfc'), out)
# Make sure it prints files separated by one newline without any
# 'ls -l'-like accessories if verbose flag is not being used
# ...
diff --git a/Lib/test/test_tcl.py b/Lib/test/test_tcl.py
index c0c6341..d12fb22 100644
--- a/Lib/test/test_tcl.py
+++ b/Lib/test/test_tcl.py
@@ -376,6 +376,7 @@ class TclTest(unittest.TestCase):
result = arg
return arg
self.interp.createcommand('testfunc', testfunc)
+ self.addCleanup(self.interp.tk.deletecommand, 'testfunc')
def check(value, expected, eq=self.assertEqual):
r = self.interp.call('testfunc', value)
self.assertIsInstance(result, str)
diff --git a/Lib/test/test_threadsignals.py b/Lib/test/test_threadsignals.py
index b1004e6..9d92742 100644
--- a/Lib/test/test_threadsignals.py
+++ b/Lib/test/test_threadsignals.py
@@ -74,6 +74,9 @@ class ThreadSignals(unittest.TestCase):
@unittest.skipIf(USING_PTHREAD_COND,
'POSIX condition variables cannot be interrupted')
+ # Issue #20564: sem_timedwait() cannot be interrupted on OpenBSD
+ @unittest.skipIf(sys.platform.startswith('openbsd'),
+ 'lock cannot be interrupted on OpenBSD')
def test_lock_acquire_interruption(self):
# Mimic receiving a SIGINT (KeyboardInterrupt) with SIGALRM while stuck
# in a deadlock.
@@ -97,6 +100,9 @@ class ThreadSignals(unittest.TestCase):
@unittest.skipIf(USING_PTHREAD_COND,
'POSIX condition variables cannot be interrupted')
+ # Issue #20564: sem_timedwait() cannot be interrupted on OpenBSD
+ @unittest.skipIf(sys.platform.startswith('openbsd'),
+ 'lock cannot be interrupted on OpenBSD')
def test_rlock_acquire_interruption(self):
# Mimic receiving a SIGINT (KeyboardInterrupt) with SIGALRM while stuck
# in a deadlock.
diff --git a/Lib/test/test_time.py b/Lib/test/test_time.py
index b8721a2..be7ddcc 100644
--- a/Lib/test/test_time.py
+++ b/Lib/test/test_time.py
@@ -14,6 +14,8 @@ except ImportError:
SIZEOF_INT = sysconfig.get_config_var('SIZEOF_INT') or 4
TIME_MAXYEAR = (1 << 8 * SIZEOF_INT - 1) - 1
TIME_MINYEAR = -TIME_MAXYEAR - 1
+_PyTime_ROUND_DOWN = 0
+_PyTime_ROUND_UP = 1
class TimeTestCase(unittest.TestCase):
@@ -226,7 +228,7 @@ class TimeTestCase(unittest.TestCase):
self.assertEqual(time.ctime(t), 'Sun Sep 16 01:03:52 1973')
t = time.mktime((2000, 1, 1, 0, 0, 0, 0, 0, -1))
self.assertEqual(time.ctime(t), 'Sat Jan 1 00:00:00 2000')
- for year in [-100, 100, 1000, 2000, 10000]:
+ for year in [-100, 100, 1000, 2000, 2050, 10000]:
try:
testval = time.mktime((year, 1, 10) + (0,)*6)
except (ValueError, OverflowError):
@@ -344,6 +346,13 @@ class TimeTestCase(unittest.TestCase):
def test_mktime(self):
# Issue #1726687
for t in (-2, -1, 0, 1):
+ if sys.platform.startswith('aix') and t == -1:
+ # Issue #11188, #19748: mktime() returns -1 on error. On Linux,
+ # the tm_wday field is used as a sentinel () to detect if -1 is
+ # really an error or a valid timestamp. On AIX, tm_wday is
+ # unchanged even on success and so cannot be used as a
+ # sentinel.
+ continue
try:
tt = time.localtime(t)
except (OverflowError, OSError):
@@ -585,58 +594,116 @@ class TestPytime(unittest.TestCase):
@support.cpython_only
def test_time_t(self):
from _testcapi import pytime_object_to_time_t
- for obj, time_t in (
- (0, 0),
- (-1, -1),
- (-1.0, -1),
- (-1.9, -1),
- (1.0, 1),
- (1.9, 1),
+ for obj, time_t, rnd in (
+ # Round towards zero
+ (0, 0, _PyTime_ROUND_DOWN),
+ (-1, -1, _PyTime_ROUND_DOWN),
+ (-1.0, -1, _PyTime_ROUND_DOWN),
+ (-1.9, -1, _PyTime_ROUND_DOWN),
+ (1.0, 1, _PyTime_ROUND_DOWN),
+ (1.9, 1, _PyTime_ROUND_DOWN),
+ # Round away from zero
+ (0, 0, _PyTime_ROUND_UP),
+ (-1, -1, _PyTime_ROUND_UP),
+ (-1.0, -1, _PyTime_ROUND_UP),
+ (-1.9, -2, _PyTime_ROUND_UP),
+ (1.0, 1, _PyTime_ROUND_UP),
+ (1.9, 2, _PyTime_ROUND_UP),
):
- self.assertEqual(pytime_object_to_time_t(obj), time_t)
+ self.assertEqual(pytime_object_to_time_t(obj, rnd), time_t)
+ rnd = _PyTime_ROUND_DOWN
for invalid in self.invalid_values:
- self.assertRaises(OverflowError, pytime_object_to_time_t, invalid)
+ self.assertRaises(OverflowError,
+ pytime_object_to_time_t, invalid, rnd)
@support.cpython_only
def test_timeval(self):
from _testcapi import pytime_object_to_timeval
- for obj, timeval in (
- (0, (0, 0)),
- (-1, (-1, 0)),
- (-1.0, (-1, 0)),
- (1e-6, (0, 1)),
- (-1e-6, (-1, 999999)),
- (-1.2, (-2, 800000)),
- (1.1234560, (1, 123456)),
- (1.1234569, (1, 123456)),
- (-1.1234560, (-2, 876544)),
- (-1.1234561, (-2, 876543)),
+ for obj, timeval, rnd in (
+ # Round towards zero
+ (0, (0, 0), _PyTime_ROUND_DOWN),
+ (-1, (-1, 0), _PyTime_ROUND_DOWN),
+ (-1.0, (-1, 0), _PyTime_ROUND_DOWN),
+ (1e-6, (0, 1), _PyTime_ROUND_DOWN),
+ (1e-7, (0, 0), _PyTime_ROUND_DOWN),
+ (-1e-6, (-1, 999999), _PyTime_ROUND_DOWN),
+ (-1e-7, (-1, 999999), _PyTime_ROUND_DOWN),
+ (-1.2, (-2, 800000), _PyTime_ROUND_DOWN),
+ (0.9999999, (0, 999999), _PyTime_ROUND_DOWN),
+ (0.0000041, (0, 4), _PyTime_ROUND_DOWN),
+ (1.1234560, (1, 123456), _PyTime_ROUND_DOWN),
+ (1.1234569, (1, 123456), _PyTime_ROUND_DOWN),
+ (-0.0000040, (-1, 999996), _PyTime_ROUND_DOWN),
+ (-0.0000041, (-1, 999995), _PyTime_ROUND_DOWN),
+ (-1.1234560, (-2, 876544), _PyTime_ROUND_DOWN),
+ (-1.1234561, (-2, 876543), _PyTime_ROUND_DOWN),
+ # Round away from zero
+ (0, (0, 0), _PyTime_ROUND_UP),
+ (-1, (-1, 0), _PyTime_ROUND_UP),
+ (-1.0, (-1, 0), _PyTime_ROUND_UP),
+ (1e-6, (0, 1), _PyTime_ROUND_UP),
+ (1e-7, (0, 1), _PyTime_ROUND_UP),
+ (-1e-6, (-1, 999999), _PyTime_ROUND_UP),
+ (-1e-7, (-1, 999999), _PyTime_ROUND_UP),
+ (-1.2, (-2, 800000), _PyTime_ROUND_UP),
+ (0.9999999, (1, 0), _PyTime_ROUND_UP),
+ (0.0000041, (0, 5), _PyTime_ROUND_UP),
+ (1.1234560, (1, 123457), _PyTime_ROUND_UP),
+ (1.1234569, (1, 123457), _PyTime_ROUND_UP),
+ (-0.0000040, (-1, 999996), _PyTime_ROUND_UP),
+ (-0.0000041, (-1, 999995), _PyTime_ROUND_UP),
+ (-1.1234560, (-2, 876544), _PyTime_ROUND_UP),
+ (-1.1234561, (-2, 876543), _PyTime_ROUND_UP),
):
- self.assertEqual(pytime_object_to_timeval(obj), timeval)
+ with self.subTest(obj=obj, round=rnd, timeval=timeval):
+ self.assertEqual(pytime_object_to_timeval(obj, rnd), timeval)
+ rnd = _PyTime_ROUND_DOWN
for invalid in self.invalid_values:
- self.assertRaises(OverflowError, pytime_object_to_timeval, invalid)
+ self.assertRaises(OverflowError,
+ pytime_object_to_timeval, invalid, rnd)
@support.cpython_only
def test_timespec(self):
from _testcapi import pytime_object_to_timespec
- for obj, timespec in (
- (0, (0, 0)),
- (-1, (-1, 0)),
- (-1.0, (-1, 0)),
- (1e-9, (0, 1)),
- (-1e-9, (-1, 999999999)),
- (-1.2, (-2, 800000000)),
- (1.1234567890, (1, 123456789)),
- (1.1234567899, (1, 123456789)),
- (-1.1234567890, (-2, 876543211)),
- (-1.1234567891, (-2, 876543210)),
+ for obj, timespec, rnd in (
+ # Round towards zero
+ (0, (0, 0), _PyTime_ROUND_DOWN),
+ (-1, (-1, 0), _PyTime_ROUND_DOWN),
+ (-1.0, (-1, 0), _PyTime_ROUND_DOWN),
+ (1e-9, (0, 1), _PyTime_ROUND_DOWN),
+ (1e-10, (0, 0), _PyTime_ROUND_DOWN),
+ (-1e-9, (-1, 999999999), _PyTime_ROUND_DOWN),
+ (-1e-10, (-1, 999999999), _PyTime_ROUND_DOWN),
+ (-1.2, (-2, 800000000), _PyTime_ROUND_DOWN),
+ (0.9999999999, (0, 999999999), _PyTime_ROUND_DOWN),
+ (1.1234567890, (1, 123456789), _PyTime_ROUND_DOWN),
+ (1.1234567899, (1, 123456789), _PyTime_ROUND_DOWN),
+ (-1.1234567890, (-2, 876543211), _PyTime_ROUND_DOWN),
+ (-1.1234567891, (-2, 876543210), _PyTime_ROUND_DOWN),
+ # Round away from zero
+ (0, (0, 0), _PyTime_ROUND_UP),
+ (-1, (-1, 0), _PyTime_ROUND_UP),
+ (-1.0, (-1, 0), _PyTime_ROUND_UP),
+ (1e-9, (0, 1), _PyTime_ROUND_UP),
+ (1e-10, (0, 1), _PyTime_ROUND_UP),
+ (-1e-9, (-1, 999999999), _PyTime_ROUND_UP),
+ (-1e-10, (-1, 999999999), _PyTime_ROUND_UP),
+ (-1.2, (-2, 800000000), _PyTime_ROUND_UP),
+ (0.9999999999, (1, 0), _PyTime_ROUND_UP),
+ (1.1234567890, (1, 123456790), _PyTime_ROUND_UP),
+ (1.1234567899, (1, 123456790), _PyTime_ROUND_UP),
+ (-1.1234567890, (-2, 876543211), _PyTime_ROUND_UP),
+ (-1.1234567891, (-2, 876543210), _PyTime_ROUND_UP),
):
- self.assertEqual(pytime_object_to_timespec(obj), timespec)
+ with self.subTest(obj=obj, round=rnd, timespec=timespec):
+ self.assertEqual(pytime_object_to_timespec(obj, rnd), timespec)
+ rnd = _PyTime_ROUND_DOWN
for invalid in self.invalid_values:
- self.assertRaises(OverflowError, pytime_object_to_timespec, invalid)
+ self.assertRaises(OverflowError,
+ pytime_object_to_timespec, invalid, rnd)
@unittest.skipUnless(time._STRUCT_TM_ITEMS == 11, "needs tm_zone support")
def test_localtime_timezone(self):
diff --git a/Lib/test/test_tokenize.py b/Lib/test/test_tokenize.py
index 6ed8597..38611a7 100644
--- a/Lib/test/test_tokenize.py
+++ b/Lib/test/test_tokenize.py
@@ -2,7 +2,7 @@ doctests = """
Tests for the tokenize module.
The tests can be really simple. Given a small fragment of source
-code, print out a table with tokens. The ENDMARK is omitted for
+code, print out a table with tokens. The ENDMARKER is omitted for
brevity.
>>> dump_tokens("1 + 1")
@@ -578,9 +578,15 @@ pass the '-ucpu' option to process the full directory.
>>> tempdir = os.path.dirname(f) or os.curdir
>>> testfiles = glob.glob(os.path.join(tempdir, "test*.py"))
-tokenize is broken on test_pep3131.py because regular expressions are broken on
-the obscure unicode identifiers in it. *sigh*
+Tokenize is broken on test_pep3131.py because regular expressions are
+broken on the obscure unicode identifiers in it. *sigh*
+With roundtrip extended to test the 5-tuple mode of untokenize,
+7 more testfiles fail. Remove them also until the failure is diagnosed.
+
>>> testfiles.remove(os.path.join(tempdir, "test_pep3131.py"))
+ >>> for f in ('buffer', 'builtin', 'fileio', 'inspect', 'os', 'platform', 'sys'):
+ ... testfiles.remove(os.path.join(tempdir, "test_%s.py") % f)
+ ...
>>> if not support.is_resource_enabled("cpu"):
... testfiles = random.sample(testfiles, 10)
...
@@ -638,7 +644,7 @@ Legacy unicode literals:
from test import support
from tokenize import (tokenize, _tokenize, untokenize, NUMBER, NAME, OP,
STRING, ENDMARKER, ENCODING, tok_name, detect_encoding,
- open as tokenize_open)
+ open as tokenize_open, Untokenizer)
from io import BytesIO
from unittest import TestCase
import os, sys, glob
@@ -659,21 +665,39 @@ def dump_tokens(s):
def roundtrip(f):
"""
Test roundtrip for `untokenize`. `f` is an open file or a string.
- The source code in f is tokenized, converted back to source code via
- tokenize.untokenize(), and tokenized again from the latter. The test
- fails if the second tokenization doesn't match the first.
+ The source code in f is tokenized to both 5- and 2-tuples.
+ Both sequences are converted back to source code via
+ tokenize.untokenize(), and the latter tokenized again to 2-tuples.
+ The test fails if the 3 pair tokenizations do not match.
+
+ When untokenize bugs are fixed, untokenize with 5-tuples should
+ reproduce code that does not contain a backslash continuation
+ following spaces. A proper test should test this.
+
+ This function would be more useful for correcting bugs if it reported
+ the first point of failure, like assertEqual, rather than just
+ returning False -- or if it were only used in unittests and not
+ doctest and actually used assertEqual.
"""
+ # Get source code and original tokenizations
if isinstance(f, str):
- f = BytesIO(f.encode('utf-8'))
- try:
- token_list = list(tokenize(f.readline))
- finally:
+ code = f.encode('utf-8')
+ else:
+ code = f.read()
f.close()
- tokens1 = [tok[:2] for tok in token_list]
- new_bytes = untokenize(tokens1)
- readline = (line for line in new_bytes.splitlines(keepends=True)).__next__
- tokens2 = [tok[:2] for tok in tokenize(readline)]
- return tokens1 == tokens2
+ readline = iter(code.splitlines(keepends=True)).__next__
+ tokens5 = list(tokenize(readline))
+ tokens2 = [tok[:2] for tok in tokens5]
+ # Reproduce tokens2 from pairs
+ bytes_from2 = untokenize(tokens2)
+ readline2 = iter(bytes_from2.splitlines(keepends=True)).__next__
+ tokens2_from2 = [tok[:2] for tok in tokenize(readline2)]
+ # Reproduce tokens2 from 5-tuples
+ bytes_from5 = untokenize(tokens5)
+ readline5 = iter(bytes_from5.splitlines(keepends=True)).__next__
+ tokens2_from5 = [tok[:2] for tok in tokenize(readline5)]
+ # Compare 3 versions
+ return tokens2 == tokens2_from2 == tokens2_from5
# This is an example from the docs, set up as a doctest.
def decistmt(s):
@@ -1153,6 +1177,47 @@ class TestTokenize(TestCase):
# See http://bugs.python.org/issue16152
self.assertExactTypeEqual('@ ', token.AT)
+class UntokenizeTest(TestCase):
+
+ def test_bad_input_order(self):
+ # raise if previous row
+ u = Untokenizer()
+ u.prev_row = 2
+ u.prev_col = 2
+ with self.assertRaises(ValueError) as cm:
+ u.add_whitespace((1,3))
+ self.assertEqual(cm.exception.args[0],
+ 'start (1,3) precedes previous end (2,2)')
+ # raise if previous column in row
+ self.assertRaises(ValueError, u.add_whitespace, (2,1))
+
+ def test_backslash_continuation(self):
+ # The problem is that <whitespace>\<newline> leaves no token
+ u = Untokenizer()
+ u.prev_row = 1
+ u.prev_col = 1
+ u.tokens = []
+ u.add_whitespace((2, 0))
+ self.assertEqual(u.tokens, ['\\\n'])
+ u.prev_row = 2
+ u.add_whitespace((4, 4))
+ self.assertEqual(u.tokens, ['\\\n', '\\\n\\\n', ' '])
+ self.assertTrue(roundtrip('a\n b\n c\n \\\n c\n'))
+
+ def test_iter_compat(self):
+ u = Untokenizer()
+ token = (NAME, 'Hello')
+ tokens = [(ENCODING, 'utf-8'), token]
+ u.compat(token, iter([]))
+ self.assertEqual(u.tokens, ["Hello "])
+ u = Untokenizer()
+ self.assertEqual(u.untokenize(iter([token])), 'Hello ')
+ u = Untokenizer()
+ self.assertEqual(u.untokenize(iter(tokens)), 'Hello ')
+ self.assertEqual(u.encoding, 'utf-8')
+ self.assertEqual(untokenize(iter(tokens)), b'Hello ')
+
+
__test__ = {"doctests" : doctests, 'decistmt': decistmt}
def test_main():
@@ -1162,6 +1227,7 @@ def test_main():
support.run_unittest(Test_Tokenize)
support.run_unittest(TestDetectEncoding)
support.run_unittest(TestTokenize)
+ support.run_unittest(UntokenizeTest)
if __name__ == "__main__":
test_main()
diff --git a/Lib/test/test_tracemalloc.py b/Lib/test/test_tracemalloc.py
index d1e5aef..c953885 100644
--- a/Lib/test/test_tracemalloc.py
+++ b/Lib/test/test_tracemalloc.py
@@ -346,6 +346,8 @@ class TestSnapshot(unittest.TestCase):
self.assertIsNot(snapshot5.traces, snapshot.traces)
self.assertEqual(snapshot5.traces, snapshot.traces)
+ self.assertRaises(TypeError, snapshot.filter_traces, filter1)
+
def test_snapshot_group_by_line(self):
snapshot, snapshot2 = create_snapshots()
tb_0 = traceback_lineno('<unknown>', 0)
diff --git a/Lib/test/test_types.py b/Lib/test/test_types.py
index ec10752..18e6b0a 100644
--- a/Lib/test/test_types.py
+++ b/Lib/test/test_types.py
@@ -1,6 +1,6 @@
# Python test set -- part 6, built-in types
-from test.support import run_unittest, run_with_locale
+from test.support import run_unittest, run_with_locale, cpython_only
import collections
import pickle
import locale
@@ -1170,9 +1170,31 @@ class SimpleNamespaceTests(unittest.TestCase):
self.assertEqual(ns, ns_roundtrip, pname)
+class SharedKeyTests(unittest.TestCase):
+
+ @cpython_only
+ def test_subclasses(self):
+ # Verify that subclasses can share keys (per PEP 412)
+ class A:
+ pass
+ class B(A):
+ pass
+
+ a, b = A(), B()
+ self.assertEqual(sys.getsizeof(vars(a)), sys.getsizeof(vars(b)))
+ self.assertLess(sys.getsizeof(vars(a)), sys.getsizeof({}))
+ a.x, a.y, a.z, a.w = range(4)
+ self.assertNotEqual(sys.getsizeof(vars(a)), sys.getsizeof(vars(b)))
+ a2 = A()
+ self.assertEqual(sys.getsizeof(vars(a)), sys.getsizeof(vars(a2)))
+ self.assertLess(sys.getsizeof(vars(a)), sys.getsizeof({}))
+ b.u, b.v, b.w, b.t = range(4)
+ self.assertLess(sys.getsizeof(vars(b)), sys.getsizeof({}))
+
+
def test_main():
run_unittest(TypesTests, MappingProxyTests, ClassCreationTests,
- SimpleNamespaceTests)
+ SimpleNamespaceTests, SharedKeyTests)
if __name__ == '__main__':
test_main()
diff --git a/Lib/test/test_urllib2.py b/Lib/test/test_urllib2.py
index b32b409..4b92d63 100644
--- a/Lib/test/test_urllib2.py
+++ b/Lib/test/test_urllib2.py
@@ -1227,7 +1227,8 @@ class HandlerTests(unittest.TestCase):
self.assertTrue(_proxy_bypass_macosx_sysconf(host, bypass),
'expected bypass of %s to be True' % host)
# Check hosts that should not trigger the proxy bypass
- for host in ('abc.foo.bar', 'bar.com', '127.0.0.2', '10.11.0.1', 'test'):
+ for host in ('abc.foo.bar', 'bar.com', '127.0.0.2', '10.11.0.1',
+ 'notinbypass'):
self.assertFalse(_proxy_bypass_macosx_sysconf(host, bypass),
'expected bypass of %s to be False' % host)