diff options
Diffstat (limited to 'Lib/test/test_poll.py')
-rw-r--r-- | Lib/test/test_poll.py | 171 |
1 files changed, 171 insertions, 0 deletions
diff --git a/Lib/test/test_poll.py b/Lib/test/test_poll.py new file mode 100644 index 0000000..a06f56b --- /dev/null +++ b/Lib/test/test_poll.py @@ -0,0 +1,171 @@ + +# Test case for the os.poll() function + +import sys, os, select, random +from test.test_support import verbose, TestSkipped, TESTFN + +try: + select.poll +except AttributeError: + raise TestSkipped, "select.poll not defined -- skipping test_poll" + + +def find_ready_matching(ready, flag): + match = [] + for fd, mode in ready: + if mode & flag: + match.append(fd) + return match + +def test_poll1(): + """Basic functional test of poll object + + Create a bunch of pipe and test that poll works with them. + """ + print 'Running poll test 1' + p = select.poll() + + NUM_PIPES = 12 + MSG = " This is a test." + MSG_LEN = len(MSG) + readers = [] + writers = [] + r2w = {} + w2r = {} + + for i in range(NUM_PIPES): + rd, wr = os.pipe() + p.register(rd, select.POLLIN) + p.register(wr, select.POLLOUT) + readers.append(rd) + writers.append(wr) + r2w[rd] = wr + w2r[wr] = rd + + while writers: + ready = p.poll() + ready_writers = find_ready_matching(ready, select.POLLOUT) + if not ready_writers: + raise RuntimeError, "no pipes ready for writing" + wr = random.choice(ready_writers) + os.write(wr, MSG) + + ready = p.poll() + ready_readers = find_ready_matching(ready, select.POLLIN) + if not ready_readers: + raise RuntimeError, "no pipes ready for reading" + rd = random.choice(ready_readers) + buf = os.read(rd, MSG_LEN) + assert len(buf) == MSG_LEN + print buf + os.close(r2w[rd]) + writers.remove(r2w[rd]) + + poll_unit_tests() + print 'Poll test 1 complete' + +def poll_unit_tests(): + # returns NVAL for invalid file descriptor + FD = 42 + try: + os.close(FD) + except OSError: + pass + p = select.poll() + p.register(FD) + r = p.poll() + assert r[0] == (FD, select.POLLNVAL) + + f = open(TESTFN, 'w') + fd = f.fileno() + p = select.poll() + p.register(f) + r = p.poll() + assert r[0][0] == fd + f.close() + r = p.poll() + assert r[0] == (fd, select.POLLNVAL) + os.unlink(TESTFN) + + # type error for invalid arguments + p = select.poll() + try: + p.register(p) + except TypeError: + pass + else: + print "Bogus register call did not raise TypeError" + try: + p.unregister(p) + except TypeError: + pass + else: + print "Bogus unregister call did not raise TypeError" + + # can't unregister non-existent object + p = select.poll() + try: + p.unregister(3) + except KeyError: + pass + else: + print "Bogus unregister call did not raise KeyError" + + # Test error cases + pollster = select.poll() + class Nope: + pass + + class Almost: + def fileno(self): + return 'fileno' + + try: + pollster.register( Nope(), 0 ) + except TypeError: pass + else: print 'expected TypeError exception, not raised' + + try: + pollster.register( Almost(), 0 ) + except TypeError: pass + else: print 'expected TypeError exception, not raised' + + +# Another test case for poll(). This is copied from the test case for +# select(), modified to use poll() instead. + +def test_poll2(): + print 'Running poll test 2' + cmd = 'for i in 0 1 2 3 4 5 6 7 8 9; do echo testing...; sleep 1; done' + p = os.popen(cmd, 'r') + pollster = select.poll() + pollster.register( p, select.POLLIN ) + for tout in (0, 1000, 2000, 4000, 8000, 16000) + (-1,)*10: + if verbose: + print 'timeout =', tout + fdlist = pollster.poll(tout) + if (fdlist == []): + continue + if fdlist[0] == (p.fileno(),select.POLLHUP): + line = p.readline() + if line != "": + print 'error: pipe seems to be closed, but still returns data' + continue + + elif fdlist[0] == (p.fileno(),select.POLLIN): + line = p.readline() + if verbose: + print `line` + if not line: + if verbose: + print 'EOF' + break + continue + else: + print 'Unexpected return value from select.poll:', fdlist + p.close() + print 'Poll test 2 complete' + +test_poll1() +test_poll2() + |