diff options
authorAndrew M. Kuchling <>2000-08-25 01:18:45 (GMT)
committerAndrew M. Kuchling <>2000-08-25 01:18:45 (GMT)
commit3227cc8c09296ecde263e23c23f1419a795c42c9 (patch)
parentcf96dc800ecbbb38a71298ee99f3fe1ab80e6c24 (diff)
Test suite for poll() interface (SF patch #100852)
2 files changed, 214 insertions, 0 deletions
diff --git a/Lib/test/output/test_poll b/Lib/test/output/test_poll
new file mode 100644
index 0000000..0c55cdc
--- /dev/null
+++ b/Lib/test/output/test_poll
@@ -0,0 +1,43 @@
+Running poll test 1
+ This is a test.
+ This is a test.
+ This is a test.
+ This is a test.
+ This is a test.
+ This is a test.
+ This is a test.
+ This is a test.
+ This is a test.
+ This is a test.
+ This is a test.
+ This is a test.
+Poll test 1 complete
+Running poll test 2
+timeout = 0
+timeout = 1000
+timeout = 2000
+timeout = 4000
+timeout = 8000
+timeout = 16000
+timeout = -1
+timeout = -1
+timeout = -1
+timeout = -1
+timeout = -1
+timeout = -1
+timeout = -1
+timeout = -1
+timeout = -1
+timeout = -1
+Poll test 2 complete
diff --git a/Lib/test/ b/Lib/test/
new file mode 100644
index 0000000..a06f56b
--- /dev/null
+++ b/Lib/test/
@@ -0,0 +1,171 @@
+# Test case for the os.poll() function
+import sys, os, select, random
+from test.test_support import verbose, TestSkipped, TESTFN
+ select.poll
+except AttributeError:
+ raise TestSkipped, "select.poll not defined -- skipping test_poll"
+def find_ready_matching(ready, flag):
+ match = []
+ for fd, mode in ready:
+ if mode & flag:
+ match.append(fd)
+ return match
+def test_poll1():
+ """Basic functional test of poll object
+ Create a bunch of pipe and test that poll works with them.
+ """
+ print 'Running poll test 1'
+ p = select.poll()
+ NUM_PIPES = 12
+ MSG = " This is a test."
+ MSG_LEN = len(MSG)
+ readers = []
+ writers = []
+ r2w = {}
+ w2r = {}
+ for i in range(NUM_PIPES):
+ rd, wr = os.pipe()
+ p.register(rd, select.POLLIN)
+ p.register(wr, select.POLLOUT)
+ readers.append(rd)
+ writers.append(wr)
+ r2w[rd] = wr
+ w2r[wr] = rd
+ while writers:
+ ready = p.poll()
+ ready_writers = find_ready_matching(ready, select.POLLOUT)
+ if not ready_writers:
+ raise RuntimeError, "no pipes ready for writing"
+ wr = random.choice(ready_writers)
+ os.write(wr, MSG)
+ ready = p.poll()
+ ready_readers = find_ready_matching(ready, select.POLLIN)
+ if not ready_readers:
+ raise RuntimeError, "no pipes ready for reading"
+ rd = random.choice(ready_readers)
+ buf =, MSG_LEN)
+ assert len(buf) == MSG_LEN
+ print buf
+ os.close(r2w[rd])
+ writers.remove(r2w[rd])
+ poll_unit_tests()
+ print 'Poll test 1 complete'
+def poll_unit_tests():
+ # returns NVAL for invalid file descriptor
+ FD = 42
+ try:
+ os.close(FD)
+ except OSError:
+ pass
+ p = select.poll()
+ p.register(FD)
+ r = p.poll()
+ assert r[0] == (FD, select.POLLNVAL)
+ f = open(TESTFN, 'w')
+ fd = f.fileno()
+ p = select.poll()
+ p.register(f)
+ r = p.poll()
+ assert r[0][0] == fd
+ f.close()
+ r = p.poll()
+ assert r[0] == (fd, select.POLLNVAL)
+ os.unlink(TESTFN)
+ # type error for invalid arguments
+ p = select.poll()
+ try:
+ p.register(p)
+ except TypeError:
+ pass
+ else:
+ print "Bogus register call did not raise TypeError"
+ try:
+ p.unregister(p)
+ except TypeError:
+ pass
+ else:
+ print "Bogus unregister call did not raise TypeError"
+ # can't unregister non-existent object
+ p = select.poll()
+ try:
+ p.unregister(3)
+ except KeyError:
+ pass
+ else:
+ print "Bogus unregister call did not raise KeyError"
+ # Test error cases
+ pollster = select.poll()
+ class Nope:
+ pass
+ class Almost:
+ def fileno(self):
+ return 'fileno'
+ try:
+ pollster.register( Nope(), 0 )
+ except TypeError: pass
+ else: print 'expected TypeError exception, not raised'
+ try:
+ pollster.register( Almost(), 0 )
+ except TypeError: pass
+ else: print 'expected TypeError exception, not raised'
+# Another test case for poll(). This is copied from the test case for
+# select(), modified to use poll() instead.
+def test_poll2():
+ print 'Running poll test 2'
+ cmd = 'for i in 0 1 2 3 4 5 6 7 8 9; do echo testing...; sleep 1; done'
+ p = os.popen(cmd, 'r')
+ pollster = select.poll()
+ pollster.register( p, select.POLLIN )
+ for tout in (0, 1000, 2000, 4000, 8000, 16000) + (-1,)*10:
+ if verbose:
+ print 'timeout =', tout
+ fdlist = pollster.poll(tout)
+ if (fdlist == []):
+ continue
+ if fdlist[0] == (p.fileno(),select.POLLHUP):
+ line = p.readline()
+ if line != "":
+ print 'error: pipe seems to be closed, but still returns data'
+ continue
+ elif fdlist[0] == (p.fileno(),select.POLLIN):
+ line = p.readline()
+ if verbose:
+ print `line`
+ if not line:
+ if verbose:
+ print 'EOF'
+ break
+ continue
+ else:
+ print 'Unexpected return value from select.poll:', fdlist
+ p.close()
+ print 'Poll test 2 complete'