diff options
Diffstat (limited to 'Lib/test/test_poll.py')
-rw-r--r-- | Lib/test/test_poll.py | 316 |
1 files changed, 140 insertions, 176 deletions
diff --git a/Lib/test/test_poll.py b/Lib/test/test_poll.py index 3a48bce..60cd3f4 100644 --- a/Lib/test/test_poll.py +++ b/Lib/test/test_poll.py @@ -1,7 +1,7 @@ # Test case for the os.poll() function -import sys, os, select, random -from test.test_support import verify, verbose, TestSkipped, TESTFN +import sys, os, select, random, unittest +from test.test_support import TestSkipped, TESTFN, run_unittest try: select.poll @@ -16,177 +16,141 @@ def find_ready_matching(ready, flag): match.append(fd) return match -def test_poll1(): - """Basic functional test of poll object - - Create a bunch of pipe and test that poll works with them. - """ - print 'Running poll test 1' - p = select.poll() - - NUM_PIPES = 12 - MSG = " This is a test." - MSG_LEN = len(MSG) - readers = [] - writers = [] - r2w = {} - w2r = {} - - for i in range(NUM_PIPES): - rd, wr = os.pipe() - p.register(rd, select.POLLIN) - p.register(wr, select.POLLOUT) - readers.append(rd) - writers.append(wr) - r2w[rd] = wr - w2r[wr] = rd - - while writers: - ready = p.poll() - ready_writers = find_ready_matching(ready, select.POLLOUT) - if not ready_writers: - raise RuntimeError, "no pipes ready for writing" - wr = random.choice(ready_writers) - os.write(wr, MSG) - - ready = p.poll() - ready_readers = find_ready_matching(ready, select.POLLIN) - if not ready_readers: - raise RuntimeError, "no pipes ready for reading" - rd = random.choice(ready_readers) - buf = os.read(rd, MSG_LEN) - verify(len(buf) == MSG_LEN) - print buf - os.close(r2w[rd]) ; os.close( rd ) - p.unregister( r2w[rd] ) - p.unregister( rd ) - writers.remove(r2w[rd]) - - poll_unit_tests() - print 'Poll test 1 complete' - -def poll_unit_tests(): - # returns NVAL for invalid file descriptor - FD = 42 - try: - os.close(FD) - except OSError: - pass - p = select.poll() - p.register(FD) - r = p.poll() - verify(r[0] == (FD, select.POLLNVAL)) - - f = open(TESTFN, 'w') - fd = f.fileno() - p = select.poll() - p.register(f) - r = p.poll() - verify(r[0][0] == fd) - f.close() - r = p.poll() - verify(r[0] == (fd, select.POLLNVAL)) - os.unlink(TESTFN) - - # type error for invalid arguments - p = select.poll() - try: - p.register(p) - except TypeError: - pass - else: - print "Bogus register call did not raise TypeError" - try: - p.unregister(p) - except TypeError: - pass - else: - print "Bogus unregister call did not raise TypeError" - - # can't unregister non-existent object - p = select.poll() - try: - p.unregister(3) - except KeyError: - pass - else: - print "Bogus unregister call did not raise KeyError" - - # Test error cases - pollster = select.poll() - class Nope: - pass - - class Almost: - def fileno(self): - return 'fileno' - - try: - pollster.register( Nope(), 0 ) - except TypeError: pass - else: print 'expected TypeError exception, not raised' - - try: - pollster.register( Almost(), 0 ) - except TypeError: pass - else: print 'expected TypeError exception, not raised' - - -# Another test case for poll(). This is copied from the test case for -# select(), modified to use poll() instead. - -def test_poll2(): - print 'Running poll test 2' - cmd = 'for i in 0 1 2 3 4 5 6 7 8 9; do echo testing...; sleep 1; done' - p = os.popen(cmd, 'r') - pollster = select.poll() - pollster.register( p, select.POLLIN ) - for tout in (0, 1000, 2000, 4000, 8000, 16000) + (-1,)*10: - if verbose: - print 'timeout =', tout - fdlist = pollster.poll(tout) - if (fdlist == []): - continue - fd, flags = fdlist[0] - if flags & select.POLLHUP: - line = p.readline() - if line != "": - print 'error: pipe seems to be closed, but still returns data' - continue - - elif flags & select.POLLIN: - line = p.readline() - if verbose: - print repr(line) - if not line: - if verbose: - print 'EOF' - break - continue - else: - print 'Unexpected return value from select.poll:', fdlist - p.close() - print 'Poll test 2 complete' - -def test_poll3(): - # test int overflow - print 'Running poll test 3' - pollster = select.poll() - pollster.register(1) - - try: - pollster.poll(1L << 64) - except OverflowError: - pass - else: - print 'Expected OverflowError with excessive timeout' - - x = 2 + 3 - if x != 5: - print 'Overflow must have occurred' - print 'Poll test 3 complete' - - -test_poll1() -test_poll2() -test_poll3() +class PollTests(unittest.TestCase): + + def test_poll1(self): + # Basic functional test of poll object + # Create a bunch of pipe and test that poll works with them. + + p = select.poll() + + NUM_PIPES = 12 + MSG = " This is a test." + MSG_LEN = len(MSG) + readers = [] + writers = [] + r2w = {} + w2r = {} + + for i in range(NUM_PIPES): + rd, wr = os.pipe() + p.register(rd, select.POLLIN) + p.register(wr, select.POLLOUT) + readers.append(rd) + writers.append(wr) + r2w[rd] = wr + w2r[wr] = rd + + bufs = [] + + while writers: + ready = p.poll() + ready_writers = find_ready_matching(ready, select.POLLOUT) + if not ready_writers: + raise RuntimeError, "no pipes ready for writing" + wr = random.choice(ready_writers) + os.write(wr, MSG) + + ready = p.poll() + ready_readers = find_ready_matching(ready, select.POLLIN) + if not ready_readers: + raise RuntimeError, "no pipes ready for reading" + rd = random.choice(ready_readers) + buf = os.read(rd, MSG_LEN) + self.assertEqual(len(buf), MSG_LEN) + bufs.append(buf) + os.close(r2w[rd]) ; os.close( rd ) + p.unregister( r2w[rd] ) + p.unregister( rd ) + writers.remove(r2w[rd]) + + self.assertEqual(bufs, [MSG] * NUM_PIPES) + + def poll_unit_tests(self): + # returns NVAL for invalid file descriptor + FD = 42 + try: + os.close(FD) + except OSError: + pass + p = select.poll() + p.register(FD) + r = p.poll() + self.assertEqual(r[0], (FD, select.POLLNVAL)) + + f = open(TESTFN, 'w') + fd = f.fileno() + p = select.poll() + p.register(f) + r = p.poll() + self.assertEqual(r[0][0], fd) + f.close() + r = p.poll() + self.assertEqual(r[0], (fd, select.POLLNVAL)) + os.unlink(TESTFN) + + # type error for invalid arguments + p = select.poll() + self.assertRaises(TypeError, p.register, p) + self.assertRaises(TypeError, p.unregister, p) + + # can't unregister non-existent object + p = select.poll() + self.assertRaises(KeyError, p.unregister, 3) + + # Test error cases + pollster = select.poll() + class Nope: + pass + + class Almost: + def fileno(self): + return 'fileno' + + self.assertRaises(TypeError, pollster.register, Nope(), 0) + self.assertRaises(TypeError, pollster.register, Almost(), 0) + + # Another test case for poll(). This is copied from the test case for + # select(), modified to use poll() instead. + + def test_poll2(self): + cmd = 'for i in 0 1 2 3 4 5 6 7 8 9; do echo testing...; sleep 1; done' + p = os.popen(cmd, 'r') + pollster = select.poll() + pollster.register( p, select.POLLIN ) + for tout in (0, 1000, 2000, 4000, 8000, 16000) + (-1,)*10: + fdlist = pollster.poll(tout) + if (fdlist == []): + continue + fd, flags = fdlist[0] + if flags & select.POLLHUP: + line = p.readline() + if line != "": + self.fail('error: pipe seems to be closed, but still returns data') + continue + + elif flags & select.POLLIN: + line = p.readline() + if not line: + break + continue + else: + self.fail('Unexpected return value from select.poll: %s' % fdlist) + p.close() + + def test_poll3(self): + # test int overflow + pollster = select.poll() + pollster.register(1) + + self.assertRaises(OverflowError, pollster.poll, 1L << 64) + + x = 2 + 3 + if x != 5: + self.fail('Overflow must have occurred') + +def test_main(): + run_unittest(PollTests) + +if __name__ == '__main__': + test_main() |