summaryrefslogtreecommitdiffstats
path: root/Lib/test/test_capi.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/test_capi.py')
-rw-r--r--Lib/test/test_capi.py67
1 files changed, 67 insertions, 0 deletions
diff --git a/Lib/test/test_capi.py b/Lib/test/test_capi.py
index efa7c34..4d37687 100644
--- a/Lib/test/test_capi.py
+++ b/Lib/test/test_capi.py
@@ -2,10 +2,14 @@
# these are all functions _testcapi exports whose name begins with 'test_'.
import sys
+import time
+import random
import unittest
+import threading
from test import support
import _testcapi
+
def testfunction(self):
"""some doc"""
return self
@@ -28,6 +32,67 @@ class CAPITest(unittest.TestCase):
self.assertRaises(AttributeError, setattr, inst.testfunction, "attribute", "test")
+class TestPendingCalls(unittest.TestCase):
+
+ def pendingcalls_submit(self, l, n):
+ def callback():
+ #this function can be interrupted by thread switching so let's
+ #use an atomic operation
+ l.append(None)
+
+ for i in range(n):
+ time.sleep(random.random()*0.02) #0.01 secs on average
+ #try submitting callback until successful.
+ #rely on regular interrupt to flush queue if we are
+ #unsuccessful.
+ while True:
+ if _testcapi._pending_threadfunc(callback):
+ break;
+
+ def pendingcalls_wait(self, l, n):
+ #now, stick around until l[0] has grown to 10
+ count = 0;
+ while len(l) != n:
+ #this busy loop is where we expect to be interrupted to
+ #run our callbacks. Note that callbacks are only run on the
+ #main thread
+ if False and test_support.verbose:
+ print("(%i)"%(len(l),),)
+ for i in range(1000):
+ a = i*i
+ count += 1
+ self.failUnless(count < 10000,
+ "timeout waiting for %i callbacks, got %i"%(n, len(l)))
+ if False and test_support.verbose:
+ print("(%i)"%(len(l),))
+
+ def test_pendingcalls_threaded(self):
+ l = []
+
+ #do every callback on a separate thread
+ n = 32
+ threads = []
+ for i in range(n):
+ t = threading.Thread(target=self.pendingcalls_submit, args = (l, 1))
+ t.start()
+ threads.append(t)
+
+ self.pendingcalls_wait(l, n)
+
+ for t in threads:
+ t.join()
+
+ def test_pendingcalls_non_threaded(self):
+ #again, just using the main thread, likely they will all be dispathced at
+ #once. It is ok to ask for too many, because we loop until we find a slot.
+ #the loop can be interrupted to dispatch.
+ #there are only 32 dispatch slots, so we go for twice that!
+ l = []
+ n = 64
+ self.pendingcalls_submit(l, n)
+ self.pendingcalls_wait(l, n)
+
+
def test_main():
support.run_unittest(CAPITest)
@@ -71,6 +136,8 @@ def test_main():
t.start()
t.join()
+ support.run_unittest(TestPendingCalls)
+
if __name__ == "__main__":
test_main()