summaryrefslogtreecommitdiffstats
path: root/Lib/test/test_concurrent_futures/executor.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/test_concurrent_futures/executor.py')
-rw-r--r--Lib/test/test_concurrent_futures/executor.py107
1 files changed, 107 insertions, 0 deletions
diff --git a/Lib/test/test_concurrent_futures/executor.py b/Lib/test/test_concurrent_futures/executor.py
new file mode 100644
index 0000000..36278bd
--- /dev/null
+++ b/Lib/test/test_concurrent_futures/executor.py
@@ -0,0 +1,107 @@
+import threading
+import time
+import weakref
+from concurrent import futures
+from test import support
+
+
+def mul(x, y):
+ return x * y
+
+def capture(*args, **kwargs):
+ return args, kwargs
+
+
+class MyObject(object):
+ def my_method(self):
+ pass
+
+
+def make_dummy_object(_):
+ return MyObject()
+
+
+class ExecutorTest:
+ # Executor.shutdown() and context manager usage is tested by
+ # ExecutorShutdownTest.
+ def test_submit(self):
+ future = self.executor.submit(pow, 2, 8)
+ self.assertEqual(256, future.result())
+
+ def test_submit_keyword(self):
+ future = self.executor.submit(mul, 2, y=8)
+ self.assertEqual(16, future.result())
+ future = self.executor.submit(capture, 1, self=2, fn=3)
+ self.assertEqual(future.result(), ((1,), {'self': 2, 'fn': 3}))
+ with self.assertRaises(TypeError):
+ self.executor.submit(fn=capture, arg=1)
+ with self.assertRaises(TypeError):
+ self.executor.submit(arg=1)
+
+ def test_map(self):
+ self.assertEqual(
+ list(self.executor.map(pow, range(10), range(10))),
+ list(map(pow, range(10), range(10))))
+
+ self.assertEqual(
+ list(self.executor.map(pow, range(10), range(10), chunksize=3)),
+ list(map(pow, range(10), range(10))))
+
+ def test_map_exception(self):
+ i = self.executor.map(divmod, [1, 1, 1, 1], [2, 3, 0, 5])
+ self.assertEqual(i.__next__(), (0, 1))
+ self.assertEqual(i.__next__(), (0, 1))
+ self.assertRaises(ZeroDivisionError, i.__next__)
+
+ def test_map_timeout(self):
+ results = []
+ try:
+ for i in self.executor.map(time.sleep,
+ [0, 0, 6],
+ timeout=5):
+ results.append(i)
+ except futures.TimeoutError:
+ pass
+ else:
+ self.fail('expected TimeoutError')
+
+ self.assertEqual([None, None], results)
+
+ def test_shutdown_race_issue12456(self):
+ # Issue #12456: race condition at shutdown where trying to post a
+ # sentinel in the call queue blocks (the queue is full while processes
+ # have exited).
+ self.executor.map(str, [2] * (self.worker_count + 1))
+ self.executor.shutdown()
+
+ @support.cpython_only
+ def test_no_stale_references(self):
+ # Issue #16284: check that the executors don't unnecessarily hang onto
+ # references.
+ my_object = MyObject()
+ my_object_collected = threading.Event()
+ my_object_callback = weakref.ref(
+ my_object, lambda obj: my_object_collected.set())
+ # Deliberately discarding the future.
+ self.executor.submit(my_object.my_method)
+ del my_object
+
+ collected = my_object_collected.wait(timeout=support.SHORT_TIMEOUT)
+ self.assertTrue(collected,
+ "Stale reference not collected within timeout.")
+
+ def test_max_workers_negative(self):
+ for number in (0, -1):
+ with self.assertRaisesRegex(ValueError,
+ "max_workers must be greater "
+ "than 0"):
+ self.executor_type(max_workers=number)
+
+ def test_free_reference(self):
+ # Issue #14406: Result iterator should not keep an internal
+ # reference to result objects.
+ for obj in self.executor.map(make_dummy_object, range(10)):
+ wr = weakref.ref(obj)
+ del obj
+ support.gc_collect() # For PyPy or other GCs.
+ self.assertIsNone(wr())