summaryrefslogtreecommitdiffstats
path: root/Lib
diff options
context:
space:
mode:
authorVictor Stinner <victor.stinner@gmail.com>2014-06-27 20:44:40 (GMT)
committerVictor Stinner <victor.stinner@gmail.com>2014-06-27 20:44:40 (GMT)
commit252d40ef1ee627148ae12ab9cbf812cfeeb27655 (patch)
treef701ae8e1f4f18be57b93f00b0c021c9eff98507 /Lib
parent680241ec995b5ef1a27a7b4e6f42be16ec21dd25 (diff)
downloadcpython-252d40ef1ee627148ae12ab9cbf812cfeeb27655.zip
cpython-252d40ef1ee627148ae12ab9cbf812cfeeb27655.tar.gz
cpython-252d40ef1ee627148ae12ab9cbf812cfeeb27655.tar.bz2
Closes #21582: Cleanup test_asyncore. Patch written by diana.
- Use support.captured_stderr() where appropriate - Removes some "from test.support import xxx" import and uses support.xxx instead.
Diffstat (limited to 'Lib')
-rw-r--r--Lib/test/test_asyncore.py54
1 files changed, 18 insertions, 36 deletions
diff --git a/Lib/test/test_asyncore.py b/Lib/test/test_asyncore.py
index 084d247..7defe65 100644
--- a/Lib/test/test_asyncore.py
+++ b/Lib/test/test_asyncore.py
@@ -5,14 +5,12 @@ import os
import socket
import sys
import time
-import warnings
import errno
import struct
+import warnings
from test import support
-from test.support import TESTFN, run_unittest, unlink, HOST, HOSTv6
from io import BytesIO
-from io import StringIO
try:
import threading
@@ -94,7 +92,7 @@ def bind_af_aware(sock, addr):
"""Helper function to bind a socket according to its family."""
if HAS_UNIX_SOCKETS and sock.family == socket.AF_UNIX:
# Make sure the path doesn't exist.
- unlink(addr)
+ support.unlink(addr)
sock.bind(addr)
@@ -257,40 +255,29 @@ class DispatcherTests(unittest.TestCase):
d = asyncore.dispatcher()
# capture output of dispatcher.log() (to stderr)
- fp = StringIO()
- stderr = sys.stderr
l1 = "Lovely spam! Wonderful spam!"
l2 = "I don't like spam!"
- try:
- sys.stderr = fp
+ with support.captured_stderr() as stderr:
d.log(l1)
d.log(l2)
- finally:
- sys.stderr = stderr
- lines = fp.getvalue().splitlines()
+ lines = stderr.getvalue().splitlines()
self.assertEqual(lines, ['log: %s' % l1, 'log: %s' % l2])
def test_log_info(self):
d = asyncore.dispatcher()
# capture output of dispatcher.log_info() (to stdout via print)
- fp = StringIO()
- stdout = sys.stdout
l1 = "Have you got anything without spam?"
l2 = "Why can't she have egg bacon spam and sausage?"
l3 = "THAT'S got spam in it!"
- try:
- sys.stdout = fp
+ with support.captured_stdout() as stdout:
d.log_info(l1, 'EGGS')
d.log_info(l2)
d.log_info(l3, 'SPAM')
- finally:
- sys.stdout = stdout
- lines = fp.getvalue().splitlines()
+ lines = stdout.getvalue().splitlines()
expected = ['EGGS: %s' % l1, 'info: %s' % l2, 'SPAM: %s' % l3]
-
self.assertEqual(lines, expected)
def test_unhandled(self):
@@ -298,18 +285,13 @@ class DispatcherTests(unittest.TestCase):
d.ignore_log_types = ()
# capture output of dispatcher.log_info() (to stdout via print)
- fp = StringIO()
- stdout = sys.stdout
- try:
- sys.stdout = fp
+ with support.captured_stdout() as stdout:
d.handle_expt()
d.handle_read()
d.handle_write()
d.handle_connect()
- finally:
- sys.stdout = stdout
- lines = fp.getvalue().splitlines()
+ lines = stdout.getvalue().splitlines()
expected = ['warning: unhandled incoming priority event',
'warning: unhandled read event',
'warning: unhandled write event',
@@ -378,7 +360,7 @@ class DispatcherWithSendTests(unittest.TestCase):
data = b"Suppose there isn't a 16-ton weight?"
d = dispatcherwithsend_noread()
d.create_socket()
- d.connect((HOST, port))
+ d.connect((support.HOST, port))
# give time for socket to connect
time.sleep(0.1)
@@ -410,14 +392,14 @@ class DispatcherWithSendTests_UsePoll(DispatcherWithSendTests):
class FileWrapperTest(unittest.TestCase):
def setUp(self):
self.d = b"It's not dead, it's sleeping!"
- with open(TESTFN, 'wb') as file:
+ with open(support.TESTFN, 'wb') as file:
file.write(self.d)
def tearDown(self):
- unlink(TESTFN)
+ support.unlink(support.TESTFN)
def test_recv(self):
- fd = os.open(TESTFN, os.O_RDONLY)
+ fd = os.open(support.TESTFN, os.O_RDONLY)
w = asyncore.file_wrapper(fd)
os.close(fd)
@@ -431,20 +413,20 @@ class FileWrapperTest(unittest.TestCase):
def test_send(self):
d1 = b"Come again?"
d2 = b"I want to buy some cheese."
- fd = os.open(TESTFN, os.O_WRONLY | os.O_APPEND)
+ fd = os.open(support.TESTFN, os.O_WRONLY | os.O_APPEND)
w = asyncore.file_wrapper(fd)
os.close(fd)
w.write(d1)
w.send(d2)
w.close()
- with open(TESTFN, 'rb') as file:
+ with open(support.TESTFN, 'rb') as file:
self.assertEqual(file.read(), self.d + d1 + d2)
@unittest.skipUnless(hasattr(asyncore, 'file_dispatcher'),
'asyncore.file_dispatcher required')
def test_dispatcher(self):
- fd = os.open(TESTFN, os.O_RDONLY)
+ fd = os.open(support.TESTFN, os.O_RDONLY)
data = []
class FileDispatcher(asyncore.file_dispatcher):
def handle_read(self):
@@ -815,12 +797,12 @@ class BaseTestAPI:
class TestAPI_UseIPv4Sockets(BaseTestAPI):
family = socket.AF_INET
- addr = (HOST, 0)
+ addr = (support.HOST, 0)
@unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 support required')
class TestAPI_UseIPv6Sockets(BaseTestAPI):
family = socket.AF_INET6
- addr = (HOSTv6, 0)
+ addr = (support.HOSTv6, 0)
@unittest.skipUnless(HAS_UNIX_SOCKETS, 'Unix sockets required')
class TestAPI_UseUnixSockets(BaseTestAPI):
@@ -829,7 +811,7 @@ class TestAPI_UseUnixSockets(BaseTestAPI):
addr = support.TESTFN
def tearDown(self):
- unlink(self.addr)
+ support.unlink(self.addr)
BaseTestAPI.tearDown(self)
class TestAPI_UseIPv4Select(TestAPI_UseIPv4Sockets, unittest.TestCase):