summaryrefslogtreecommitdiffstats
path: root/Lib/test/test_asyncore.py
diff options
context:
space:
mode:
authorEzio Melotti <ezio.melotti@gmail.com>2010-07-27 22:03:33 (GMT)
committerEzio Melotti <ezio.melotti@gmail.com>2010-07-27 22:03:33 (GMT)
commit63c46403279fcb6a9cae43c475fd2c58e1303b0b (patch)
tree137a6cbec5db2727f39efcc0baa07b087c5e06eb /Lib/test/test_asyncore.py
parentf1046ca8173380e2c320c56e1cdc911493371057 (diff)
downloadcpython-63c46403279fcb6a9cae43c475fd2c58e1303b0b.zip
cpython-63c46403279fcb6a9cae43c475fd2c58e1303b0b.tar.gz
cpython-63c46403279fcb6a9cae43c475fd2c58e1303b0b.tar.bz2
Use proper skips and assert* methods in test_asyncore.
Diffstat (limited to 'Lib/test/test_asyncore.py')
-rw-r--r--Lib/test/test_asyncore.py193
1 files changed, 97 insertions, 96 deletions
diff --git a/Lib/test/test_asyncore.py b/Lib/test/test_asyncore.py
index 27a585d..fe820de 100644
--- a/Lib/test/test_asyncore.py
+++ b/Lib/test/test_asyncore.py
@@ -118,65 +118,65 @@ class HelperFunctionTests(unittest.TestCase):
# http://mail.python.org/pipermail/python-list/2001-October/109973.html)
# These constants should be present as long as poll is available
- if hasattr(select, 'poll'):
- def test_readwrite(self):
- # Check that correct methods are called by readwrite()
-
- attributes = ('read', 'expt', 'write', 'closed', 'error_handled')
-
- expected = (
- (select.POLLIN, 'read'),
- (select.POLLPRI, 'expt'),
- (select.POLLOUT, 'write'),
- (select.POLLERR, 'closed'),
- (select.POLLHUP, 'closed'),
- (select.POLLNVAL, 'closed'),
- )
-
- class testobj:
- def __init__(self):
- self.read = False
- self.write = False
- self.closed = False
- self.expt = False
- self.error_handled = False
-
- def handle_read_event(self):
- self.read = True
-
- def handle_write_event(self):
- self.write = True
-
- def handle_close(self):
- self.closed = True
-
- def handle_expt_event(self):
- self.expt = True
-
- def handle_error(self):
- self.error_handled = True
-
- for flag, expectedattr in expected:
- tobj = testobj()
- self.assertEqual(getattr(tobj, expectedattr), False)
- asyncore.readwrite(tobj, flag)
-
- # Only the attribute modified by the routine we expect to be
- # called should be True.
- for attr in attributes:
- self.assertEqual(getattr(tobj, attr), attr==expectedattr)
-
- # check that ExitNow exceptions in the object handler method
- # bubbles all the way up through asyncore readwrite call
- tr1 = exitingdummy()
- self.assertRaises(asyncore.ExitNow, asyncore.readwrite, tr1, flag)
-
- # check that an exception other than ExitNow in the object handler
- # method causes the handle_error method to get called
- tr2 = crashingdummy()
- self.assertEqual(tr2.error_handled, False)
- asyncore.readwrite(tr2, flag)
- self.assertEqual(tr2.error_handled, True)
+ @unittest.skipUnless(hasattr(select, 'poll'), 'select.poll required')
+ def test_readwrite(self):
+ # Check that correct methods are called by readwrite()
+
+ attributes = ('read', 'expt', 'write', 'closed', 'error_handled')
+
+ expected = (
+ (select.POLLIN, 'read'),
+ (select.POLLPRI, 'expt'),
+ (select.POLLOUT, 'write'),
+ (select.POLLERR, 'closed'),
+ (select.POLLHUP, 'closed'),
+ (select.POLLNVAL, 'closed'),
+ )
+
+ class testobj:
+ def __init__(self):
+ self.read = False
+ self.write = False
+ self.closed = False
+ self.expt = False
+ self.error_handled = False
+
+ def handle_read_event(self):
+ self.read = True
+
+ def handle_write_event(self):
+ self.write = True
+
+ def handle_close(self):
+ self.closed = True
+
+ def handle_expt_event(self):
+ self.expt = True
+
+ def handle_error(self):
+ self.error_handled = True
+
+ for flag, expectedattr in expected:
+ tobj = testobj()
+ self.assertEqual(getattr(tobj, expectedattr), False)
+ asyncore.readwrite(tobj, flag)
+
+ # Only the attribute modified by the routine we expect to be
+ # called should be True.
+ for attr in attributes:
+ self.assertEqual(getattr(tobj, attr), attr==expectedattr)
+
+ # check that ExitNow exceptions in the object handler method
+ # bubbles all the way up through asyncore readwrite call
+ tr1 = exitingdummy()
+ self.assertRaises(asyncore.ExitNow, asyncore.readwrite, tr1, flag)
+
+ # check that an exception other than ExitNow in the object handler
+ # method causes the handle_error method to get called
+ tr2 = crashingdummy()
+ self.assertEqual(tr2.error_handled, False)
+ asyncore.readwrite(tr2, flag)
+ self.assertEqual(tr2.error_handled, True)
def test_closeall(self):
self.closeall_check(False)
@@ -259,7 +259,7 @@ class DispatcherTests(unittest.TestCase):
sys.stderr = stderr
lines = fp.getvalue().splitlines()
- self.assertEquals(lines, ['log: %s' % l1, 'log: %s' % l2])
+ self.assertEqual(lines, ['log: %s' % l1, 'log: %s' % l2])
def test_log_info(self):
d = asyncore.dispatcher()
@@ -281,7 +281,7 @@ class DispatcherTests(unittest.TestCase):
lines = fp.getvalue().splitlines()
expected = ['EGGS: %s' % l1, 'info: %s' % l2, 'SPAM: %s' % l3]
- self.assertEquals(lines, expected)
+ self.assertEqual(lines, expected)
def test_unhandled(self):
d = asyncore.dispatcher()
@@ -306,7 +306,7 @@ class DispatcherTests(unittest.TestCase):
'warning: unhandled write event',
'warning: unhandled connect event',
'warning: unhandled accept event']
- self.assertEquals(lines, expected)
+ self.assertEqual(lines, expected)
def test_issue_8594(self):
# XXX - this test is supposed to be removed in next major Python
@@ -322,7 +322,7 @@ class DispatcherTests(unittest.TestCase):
warnings.simplefilter("always")
family = d.family
self.assertEqual(family, socket.AF_INET)
- self.assertTrue(len(w) == 1)
+ self.assertEqual(len(w), 1)
self.assertTrue(issubclass(w[0].category, DeprecationWarning))
def test_strerror(self):
@@ -331,7 +331,7 @@ class DispatcherTests(unittest.TestCase):
if hasattr(os, 'strerror'):
self.assertEqual(err, os.strerror(errno.EPERM))
err = asyncore._strerror(-1)
- self.assertTrue("unknown error" in err.lower())
+ self.assertIn("unknown error", err.lower())
class dispatcherwithsend_noread(asyncore.dispatcher_with_send):
@@ -394,38 +394,39 @@ class DispatcherWithSendTests(unittest.TestCase):
class DispatcherWithSendTests_UsePoll(DispatcherWithSendTests):
usepoll = True
-if hasattr(asyncore, 'file_wrapper'):
- class FileWrapperTest(unittest.TestCase):
- def setUp(self):
- self.d = b"It's not dead, it's sleeping!"
- open(TESTFN, 'wb').write(self.d)
-
- def tearDown(self):
- unlink(TESTFN)
-
- def test_recv(self):
- fd = os.open(TESTFN, os.O_RDONLY)
- w = asyncore.file_wrapper(fd)
- os.close(fd)
-
- self.assertNotEqual(w.fd, fd)
- self.assertNotEqual(w.fileno(), fd)
- self.assertEqual(w.recv(13), b"It's not dead")
- self.assertEqual(w.read(6), b", it's")
- w.close()
- self.assertRaises(OSError, w.read, 1)
-
- def test_send(self):
- d1 = b"Come again?"
- d2 = b"I want to buy some cheese."
- fd = os.open(TESTFN, os.O_WRONLY | os.O_APPEND)
- w = asyncore.file_wrapper(fd)
- os.close(fd)
-
- w.write(d1)
- w.send(d2)
- w.close()
- self.assertEqual(open(TESTFN, 'rb').read(), self.d + d1 + d2)
+@unittest.skipUnless(hasattr(asyncore, 'file_wrapper'),
+ 'asyncore.file_wrapper required')
+class FileWrapperTest(unittest.TestCase):
+ def setUp(self):
+ self.d = b"It's not dead, it's sleeping!"
+ open(TESTFN, 'wb').write(self.d)
+
+ def tearDown(self):
+ unlink(TESTFN)
+
+ def test_recv(self):
+ fd = os.open(TESTFN, os.O_RDONLY)
+ w = asyncore.file_wrapper(fd)
+ os.close(fd)
+
+ self.assertNotEqual(w.fd, fd)
+ self.assertNotEqual(w.fileno(), fd)
+ self.assertEqual(w.recv(13), b"It's not dead")
+ self.assertEqual(w.read(6), b", it's")
+ w.close()
+ self.assertRaises(OSError, w.read, 1)
+
+ def test_send(self):
+ d1 = b"Come again?"
+ d2 = b"I want to buy some cheese."
+ fd = os.open(TESTFN, os.O_WRONLY | os.O_APPEND)
+ w = asyncore.file_wrapper(fd)
+ os.close(fd)
+
+ w.write(d1)
+ w.send(d2)
+ w.close()
+ self.assertEqual(open(TESTFN, 'rb').read(), self.d + d1 + d2)
class BaseTestHandler(asyncore.dispatcher):