diff options
Diffstat (limited to 'Lib/test/test_concurrent_futures/executor.py')
| -rw-r--r-- | Lib/test/test_concurrent_futures/executor.py | 107 |
1 files changed, 107 insertions, 0 deletions
diff --git a/Lib/test/test_concurrent_futures/executor.py b/Lib/test/test_concurrent_futures/executor.py new file mode 100644 index 0000000..36278bd --- /dev/null +++ b/Lib/test/test_concurrent_futures/executor.py @@ -0,0 +1,107 @@ +import threading +import time +import weakref +from concurrent import futures +from test import support + + +def mul(x, y): + return x * y + +def capture(*args, **kwargs): + return args, kwargs + + +class MyObject(object): + def my_method(self): + pass + + +def make_dummy_object(_): + return MyObject() + + +class ExecutorTest: + # Executor.shutdown() and context manager usage is tested by + # ExecutorShutdownTest. + def test_submit(self): + future = self.executor.submit(pow, 2, 8) + self.assertEqual(256, future.result()) + + def test_submit_keyword(self): + future = self.executor.submit(mul, 2, y=8) + self.assertEqual(16, future.result()) + future = self.executor.submit(capture, 1, self=2, fn=3) + self.assertEqual(future.result(), ((1,), {'self': 2, 'fn': 3})) + with self.assertRaises(TypeError): + self.executor.submit(fn=capture, arg=1) + with self.assertRaises(TypeError): + self.executor.submit(arg=1) + + def test_map(self): + self.assertEqual( + list(self.executor.map(pow, range(10), range(10))), + list(map(pow, range(10), range(10)))) + + self.assertEqual( + list(self.executor.map(pow, range(10), range(10), chunksize=3)), + list(map(pow, range(10), range(10)))) + + def test_map_exception(self): + i = self.executor.map(divmod, [1, 1, 1, 1], [2, 3, 0, 5]) + self.assertEqual(i.__next__(), (0, 1)) + self.assertEqual(i.__next__(), (0, 1)) + self.assertRaises(ZeroDivisionError, i.__next__) + + def test_map_timeout(self): + results = [] + try: + for i in self.executor.map(time.sleep, + [0, 0, 6], + timeout=5): + results.append(i) + except futures.TimeoutError: + pass + else: + self.fail('expected TimeoutError') + + self.assertEqual([None, None], results) + + def test_shutdown_race_issue12456(self): + # Issue #12456: race condition at shutdown where trying to post a + # sentinel in the call queue blocks (the queue is full while processes + # have exited). + self.executor.map(str, [2] * (self.worker_count + 1)) + self.executor.shutdown() + + @support.cpython_only + def test_no_stale_references(self): + # Issue #16284: check that the executors don't unnecessarily hang onto + # references. + my_object = MyObject() + my_object_collected = threading.Event() + my_object_callback = weakref.ref( + my_object, lambda obj: my_object_collected.set()) + # Deliberately discarding the future. + self.executor.submit(my_object.my_method) + del my_object + + collected = my_object_collected.wait(timeout=support.SHORT_TIMEOUT) + self.assertTrue(collected, + "Stale reference not collected within timeout.") + + def test_max_workers_negative(self): + for number in (0, -1): + with self.assertRaisesRegex(ValueError, + "max_workers must be greater " + "than 0"): + self.executor_type(max_workers=number) + + def test_free_reference(self): + # Issue #14406: Result iterator should not keep an internal + # reference to result objects. + for obj in self.executor.map(make_dummy_object, range(10)): + wr = weakref.ref(obj) + del obj + support.gc_collect() # For PyPy or other GCs. + self.assertIsNone(wr()) |
