summaryrefslogtreecommitdiffstats
path: root/Lib/test/test_poll.py
diff options
context:
space:
mode:
authorSerhiy Storchaka <storchaka@gmail.com>2013-08-20 17:38:21 (GMT)
committerSerhiy Storchaka <storchaka@gmail.com>2013-08-20 17:38:21 (GMT)
commitb1973c252c2eec757eaa067afaf593c2cc5ea8db (patch)
treeed78f6f23d427c0dd0ee589c308d1b5cbb297a77 /Lib/test/test_poll.py
parentec67d187ee9a86aaf1108643832f69ad3bc0e369 (diff)
downloadcpython-b1973c252c2eec757eaa067afaf593c2cc5ea8db.zip
cpython-b1973c252c2eec757eaa067afaf593c2cc5ea8db.tar.gz
cpython-b1973c252c2eec757eaa067afaf593c2cc5ea8db.tar.bz2
Issue #8865: Concurrent invocation of select.poll.poll() now raises a
RuntimeError exception. Patch by Christian Schubert.
Diffstat (limited to 'Lib/test/test_poll.py')
-rw-r--r--Lib/test/test_poll.py42
1 files changed, 40 insertions, 2 deletions
diff --git a/Lib/test/test_poll.py b/Lib/test/test_poll.py
index f2d1795..f98a280 100644
--- a/Lib/test/test_poll.py
+++ b/Lib/test/test_poll.py
@@ -1,8 +1,16 @@
# Test case for the os.poll() function
-import os, select, random, unittest
+import os
+import random
+import select
import _testcapi
-from test.support import TESTFN, run_unittest
+try:
+ import threading
+except ImportError:
+ threading = None
+import time
+import unittest
+from test.support import TESTFN, run_unittest, reap_threads
try:
select.poll
@@ -160,6 +168,36 @@ class PollTests(unittest.TestCase):
self.assertRaises(OverflowError, pollster.poll, _testcapi.INT_MAX + 1)
self.assertRaises(OverflowError, pollster.poll, _testcapi.UINT_MAX + 1)
+ @unittest.skipUnless(threading, 'Threading required for this test.')
+ @reap_threads
+ def test_threaded_poll(self):
+ r, w = os.pipe()
+ self.addCleanup(os.close, r)
+ self.addCleanup(os.close, w)
+ rfds = []
+ for i in range(10):
+ fd = os.dup(r)
+ self.addCleanup(os.close, fd)
+ rfds.append(fd)
+ pollster = select.poll()
+ for fd in rfds:
+ pollster.register(fd, select.POLLIN)
+
+ t = threading.Thread(target=pollster.poll)
+ t.start()
+ try:
+ time.sleep(0.5)
+ # trigger ufds array reallocation
+ for fd in rfds:
+ pollster.unregister(fd)
+ pollster.register(w, select.POLLOUT)
+ self.assertRaises(RuntimeError, pollster.poll)
+ finally:
+ # and make the call to poll() from the thread return
+ os.write(w, b'spam')
+ t.join()
+
+
def test_main():
run_unittest(PollTests)