summaryrefslogtreecommitdiffstats
path: root/Lib/test/test_poll.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/test_poll.py')
-rw-r--r--Lib/test/test_poll.py58
1 files changed, 30 insertions, 28 deletions
diff --git a/Lib/test/test_poll.py b/Lib/test/test_poll.py
index ef966bf..7ad693d 100644
--- a/Lib/test/test_poll.py
+++ b/Lib/test/test_poll.py
@@ -1,18 +1,20 @@
# Test case for the os.poll() function
import os
-import subprocess
import random
import select
-import threading
+try:
+ import threading
+except ImportError:
+ threading = None
import time
import unittest
-from test.support import TESTFN, run_unittest, reap_threads, cpython_only
+from test.test_support import TESTFN, run_unittest, reap_threads, cpython_only
try:
select.poll
except AttributeError:
- raise unittest.SkipTest("select.poll not defined")
+ raise unittest.SkipTest, "select.poll not defined -- skipping test_poll"
def find_ready_matching(ready, flag):
@@ -31,7 +33,7 @@ class PollTests(unittest.TestCase):
p = select.poll()
NUM_PIPES = 12
- MSG = b" This is a test."
+ MSG = " This is a test."
MSG_LEN = len(MSG)
readers = []
writers = []
@@ -54,14 +56,14 @@ class PollTests(unittest.TestCase):
ready = p.poll()
ready_writers = find_ready_matching(ready, select.POLLOUT)
if not ready_writers:
- raise RuntimeError("no pipes ready for writing")
+ raise RuntimeError, "no pipes ready for writing"
wr = random.choice(ready_writers)
os.write(wr, MSG)
ready = p.poll()
ready_readers = find_ready_matching(ready, select.POLLIN)
if not ready_readers:
- raise RuntimeError("no pipes ready for reading")
+ raise RuntimeError, "no pipes ready for reading"
rd = random.choice(ready_readers)
buf = os.read(rd, MSG_LEN)
self.assertEqual(len(buf), MSG_LEN)
@@ -73,22 +75,25 @@ class PollTests(unittest.TestCase):
self.assertEqual(bufs, [MSG] * NUM_PIPES)
- def test_poll_unit_tests(self):
+ def poll_unit_tests(self):
# returns NVAL for invalid file descriptor
- FD, w = os.pipe()
- os.close(FD)
- os.close(w)
+ FD = 42
+ try:
+ os.close(FD)
+ except OSError:
+ pass
p = select.poll()
p.register(FD)
r = p.poll()
self.assertEqual(r[0], (FD, select.POLLNVAL))
- with open(TESTFN, 'w') as f:
- fd = f.fileno()
- p = select.poll()
- p.register(f)
- r = p.poll()
- self.assertEqual(r[0][0], fd)
+ f = open(TESTFN, 'w')
+ fd = f.fileno()
+ p = select.poll()
+ p.register(f)
+ r = p.poll()
+ self.assertEqual(r[0][0], fd)
+ f.close()
r = p.poll()
self.assertEqual(r[0], (fd, select.POLLNVAL))
os.unlink(TESTFN)
@@ -119,11 +124,7 @@ class PollTests(unittest.TestCase):
def test_poll2(self):
cmd = 'for i in 0 1 2 3 4 5 6 7 8 9; do echo testing...; sleep 1; done'
- proc = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE,
- bufsize=0)
- proc.__enter__()
- self.addCleanup(proc.__exit__, None, None, None)
- p = proc.stdout
+ p = os.popen(cmd, 'r')
pollster = select.poll()
pollster.register( p, select.POLLIN )
for tout in (0, 1000, 2000, 4000, 8000, 16000) + (-1,)*10:
@@ -133,7 +134,7 @@ class PollTests(unittest.TestCase):
fd, flags = fdlist[0]
if flags & select.POLLHUP:
line = p.readline()
- if line != b"":
+ if line != "":
self.fail('error: pipe seems to be closed, but still returns data')
continue
@@ -141,26 +142,26 @@ class PollTests(unittest.TestCase):
line = p.readline()
if not line:
break
- self.assertEqual(line, b'testing...\n')
continue
else:
self.fail('Unexpected return value from select.poll: %s' % fdlist)
+ p.close()
def test_poll3(self):
# test int overflow
pollster = select.poll()
pollster.register(1)
- self.assertRaises(OverflowError, pollster.poll, 1 << 64)
+ self.assertRaises(OverflowError, pollster.poll, 1L << 64)
x = 2 + 3
if x != 5:
self.fail('Overflow must have occurred')
# Issues #15989, #17919
- self.assertRaises(ValueError, pollster.register, 0, -1)
+ self.assertRaises(OverflowError, pollster.register, 0, -1)
self.assertRaises(OverflowError, pollster.register, 0, 1 << 64)
- self.assertRaises(ValueError, pollster.modify, 1, -1)
+ self.assertRaises(OverflowError, pollster.modify, 1, -1)
self.assertRaises(OverflowError, pollster.modify, 1, 1 << 64)
@cpython_only
@@ -175,6 +176,7 @@ class PollTests(unittest.TestCase):
self.assertRaises(OverflowError, pollster.poll, INT_MAX + 1)
self.assertRaises(OverflowError, pollster.poll, UINT_MAX + 1)
+ @unittest.skipUnless(threading, 'Threading required for this test.')
@reap_threads
def test_threaded_poll(self):
r, w = os.pipe()
@@ -206,7 +208,7 @@ class PollTests(unittest.TestCase):
@unittest.skipUnless(threading, 'Threading required for this test.')
@reap_threads
def test_poll_blocks_with_negative_ms(self):
- for timeout_ms in [None, -1000, -1, -1.0, -0.1, -1e-100]:
+ for timeout_ms in [None, -1000, -1, -1.0]:
# Create two file descriptors. This will be used to unlock
# the blocking call to poll.poll inside the thread
r, w = os.pipe()