diff options
author | Antoine Pitrou <solipsis@pitrou.net> | 2011-06-06 17:35:31 (GMT) |
---|---|---|
committer | Antoine Pitrou <solipsis@pitrou.net> | 2011-06-06 17:35:31 (GMT) |
commit | 176f07dadf80c27348486fdd8017fabbb30da842 (patch) | |
tree | d489214386a5a7d5efec5d157be5783554c6cec2 /Lib | |
parent | f068ab830448ea9f78ee0dfbb29c12211d1b6aee (diff) | |
download | cpython-176f07dadf80c27348486fdd8017fabbb30da842.zip cpython-176f07dadf80c27348486fdd8017fabbb30da842.tar.gz cpython-176f07dadf80c27348486fdd8017fabbb30da842.tar.bz2 |
Issue #12040: Expose a new attribute `sentinel` on instances of
:class:`multiprocessing.Process`. Also, fix Process.join() to not use
polling anymore, when given a timeout.
Diffstat (limited to 'Lib')
-rw-r--r-- | Lib/multiprocessing/forking.py | 34 | ||||
-rw-r--r-- | Lib/multiprocessing/process.py | 12 | ||||
-rw-r--r-- | Lib/multiprocessing/util.py | 21 | ||||
-rw-r--r-- | Lib/test/test_multiprocessing.py | 37 |
4 files changed, 90 insertions, 14 deletions
diff --git a/Lib/multiprocessing/forking.py b/Lib/multiprocessing/forking.py index 3d95557..3c359cb 100644 --- a/Lib/multiprocessing/forking.py +++ b/Lib/multiprocessing/forking.py @@ -101,10 +101,12 @@ else: if sys.platform != 'win32': import time + import select exit = os._exit duplicate = os.dup close = os.close + _select = util._eintr_retry(select.select) # # We define a Popen class similar to the one from subprocess, but @@ -118,8 +120,12 @@ if sys.platform != 'win32': sys.stderr.flush() self.returncode = None + r, w = os.pipe() + self.sentinel = r + self.pid = os.fork() if self.pid == 0: + os.close(r) if 'random' in sys.modules: import random random.seed() @@ -128,6 +134,11 @@ if sys.platform != 'win32': sys.stderr.flush() os._exit(code) + # `w` will be closed when the child exits, at which point `r` + # will become ready for reading (using e.g. select()). + os.close(w) + util.Finalize(self, os.close, (r,)) + def poll(self, flag=os.WNOHANG): if self.returncode is None: try: @@ -145,20 +156,14 @@ if sys.platform != 'win32': return self.returncode def wait(self, timeout=None): - if timeout is None: - return self.poll(0) - deadline = time.time() + timeout - delay = 0.0005 - while 1: - res = self.poll() - if res is not None: - break - remaining = deadline - time.time() - if remaining <= 0: - break - delay = min(delay * 2, remaining, 0.05) - time.sleep(delay) - return res + if self.returncode is None: + if timeout is not None: + r = _select([self.sentinel], [], [], timeout)[0] + if not r: + return None + # This shouldn't block if select() returned successfully. + return self.poll(os.WNOHANG if timeout == 0.0 else 0) + return self.returncode def terminate(self): if self.returncode is None: @@ -258,6 +263,7 @@ else: self.pid = pid self.returncode = None self._handle = hp + self.sentinel = int(hp) # send information to child prep_data = get_preparation_data(process_obj._name) diff --git a/Lib/multiprocessing/process.py b/Lib/multiprocessing/process.py index 3fb9ff6..99ee532 100644 --- a/Lib/multiprocessing/process.py +++ b/Lib/multiprocessing/process.py @@ -132,6 +132,7 @@ class Process(object): else: from .forking import Popen self._popen = Popen(self) + self._sentinel = self._popen.sentinel _current_process._children.add(self) def terminate(self): @@ -218,6 +219,17 @@ class Process(object): pid = ident + @property + def sentinel(self): + ''' + Return a file descriptor (Unix) or handle (Windows) suitable for + waiting for process termination. + ''' + try: + return self._sentinel + except AttributeError: + raise ValueError("process not started") + def __repr__(self): if self is _current_process: status = 'started' diff --git a/Lib/multiprocessing/util.py b/Lib/multiprocessing/util.py index 30b7a85..7949d3a 100644 --- a/Lib/multiprocessing/util.py +++ b/Lib/multiprocessing/util.py @@ -32,9 +32,11 @@ # SUCH DAMAGE. # +import functools import itertools import weakref import atexit +import select import threading # we want threading to install it's # cleanup function before multiprocessing does @@ -315,3 +317,22 @@ class ForkAwareLocal(threading.local): register_after_fork(self, lambda obj : obj.__dict__.clear()) def __reduce__(self): return type(self), () + + +# +# Automatic retry after EINTR +# + +def _eintr_retry(func, _errors=(EnvironmentError, select.error)): + @functools.wraps(func) + def wrapped(*args, **kwargs): + while True: + try: + return func(*args, **kwargs) + except _errors as e: + # select.error has no `errno` attribute + if e.args[0] == errno.EINTR: + continue + raise + return wrapped + diff --git a/Lib/test/test_multiprocessing.py b/Lib/test/test_multiprocessing.py index 0c05ff6..85094cc 100644 --- a/Lib/test/test_multiprocessing.py +++ b/Lib/test/test_multiprocessing.py @@ -71,6 +71,23 @@ HAVE_GETVALUE = not getattr(_multiprocessing, 'HAVE_BROKEN_SEM_GETVALUE', False) WIN32 = (sys.platform == "win32") +if WIN32: + from _subprocess import WaitForSingleObject, INFINITE, WAIT_OBJECT_0 + + def wait_for_handle(handle, timeout): + if timeout is None or timeout < 0.0: + timeout = INFINITE + else: + timeout = int(1000 * timeout) + return WaitForSingleObject(handle, timeout) == WAIT_OBJECT_0 +else: + from select import select + _select = util._eintr_retry(select) + + def wait_for_handle(handle, timeout): + if timeout is not None and timeout < 0.0: + timeout = None + return handle in _select([handle], [], [], timeout)[0] # # Some tests require ctypes @@ -307,6 +324,26 @@ class _TestProcess(BaseTestCase): ] self.assertEqual(result, expected) + @classmethod + def _test_sentinel(cls, event): + event.wait(10.0) + + def test_sentinel(self): + if self.TYPE == "threads": + return + event = self.Event() + p = self.Process(target=self._test_sentinel, args=(event,)) + with self.assertRaises(ValueError): + p.sentinel + p.start() + self.addCleanup(p.join) + sentinel = p.sentinel + self.assertIsInstance(sentinel, int) + self.assertFalse(wait_for_handle(sentinel, timeout=0.0)) + event.set() + p.join() + self.assertTrue(wait_for_handle(sentinel, timeout=DELTA)) + # # # |